Skip to content

Commit

Permalink
fix: ChannelManager concurret Release and Watch bug
Browse files Browse the repository at this point in the history
See also: milvus-io#38589

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Dec 19, 2024
1 parent 3d360c0 commit 327d2c6
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 52 deletions.
20 changes: 18 additions & 2 deletions internal/datacoord/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,15 @@ func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *St
return c
}

func (c *StateChannel) TransitionOnSuccess() {
func (c *StateChannel) TransitionOnSuccess(opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on success but opID not match, stay original state ",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}
switch c.currentState {
case Standby:
c.setState(ToWatch)
Expand All @@ -208,7 +216,15 @@ func (c *StateChannel) TransitionOnSuccess() {
}
}

func (c *StateChannel) TransitionOnFailure() {
func (c *StateChannel) TransitionOnFailure(opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on failure but opID not match, stay original state",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}
switch c.currentState {
case Watching:
c.setState(Standby)
Expand Down
46 changes: 31 additions & 15 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
nodeID := nodeAssign.NodeID

var (
succeededChannels = make([]RWChannel, 0, channelCount)
failedChannels = make([]RWChannel, 0, channelCount)
succeededChannels = 0
failedChannels = 0
futures = make([]*conc.Future[any], 0, channelCount)
)

Expand All @@ -564,31 +564,42 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [

future := getOrCreateIOPool().Submit(func() (any, error) {
err := m.Notify(ctx, nodeID, tmpWatchInfo)
return innerCh, err
return poolResult{
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
}, err
})
futures = append(futures, future)
}

for _, f := range futures {
ch, err := f.Await()
got, err := f.Await()
res := got.(poolResult)

if err != nil {
failedChannels = append(failedChannels, ch.(RWChannel))
log.Ctx(ctx).Warn("Failed to notify channel operations to datanode",
zap.Int64("assignment", nodeAssign.NodeID),
zap.Int("operation count", channelCount),
zap.String("channel name", res.ch.GetName()),
zap.Error(err),
)
failedChannels++
} else {
succeededChannels = append(succeededChannels, ch.(RWChannel))
succeededChannels++
advanced = true
}

m.mu.Lock()
m.store.UpdateState(err == nil, nodeID, res.ch, res.opID)
m.mu.Unlock()
}

log.Ctx(ctx).Info("Finish to notify channel operations to datanode",
zap.Int64("assignment", nodeAssign.NodeID),
zap.Int("operation count", channelCount),
zap.Int("success count", len(succeededChannels)),
zap.Int("failure count", len(failedChannels)),
zap.Int("success count", succeededChannels),
zap.Int("failure count", failedChannels),
)
m.mu.Lock()
m.store.UpdateState(false, failedChannels...)
m.store.UpdateState(true, succeededChannels...)
m.mu.Unlock()
}

return advanced
Expand All @@ -597,6 +608,7 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
type poolResult struct {
successful bool
ch RWChannel
opID int64
}

func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
Expand All @@ -617,13 +629,15 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No

for _, ch := range nodeAssign.Channels {
innerCh := ch
tmpWatchInfo := typeutil.Clone(innerCh.GetWatchInfo())

future := getOrCreateIOPool().Submit(func() (any, error) {
successful, got := m.Check(ctx, nodeID, innerCh.GetWatchInfo())
successful, got := m.Check(ctx, nodeID, tmpWatchInfo)
if got {
return poolResult{
successful: successful,
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
}, nil
}
return nil, errors.New("Got results with no progress")
Expand All @@ -633,10 +647,10 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No

for _, f := range futures {
got, err := f.Await()
result := got.(poolResult)
if err == nil {
m.mu.Lock()
result := got.(poolResult)
m.store.UpdateState(result.successful, result.ch)
m.store.UpdateState(result.successful, nodeID, result.ch, result.opID)
m.mu.Unlock()

advanced = true
Expand All @@ -656,6 +670,7 @@ func (m *ChannelManagerImpl) Notify(ctx context.Context, nodeID int64, info *dat
zap.String("channel", info.GetVchan().GetChannelName()),
zap.Int64("assignment", nodeID),
zap.String("operation", info.GetState().String()),
zap.Int64("opID", info.GetOpID()),
)
log.Info("Notify channel operation")
err := m.subCluster.NotifyChannelOperation(ctx, nodeID, &datapb.ChannelOperationsRequest{Infos: []*datapb.ChannelWatchInfo{info}})
Expand Down Expand Up @@ -706,6 +721,7 @@ func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *data
if resp.GetState() == datapb.ChannelWatchState_ReleaseFailure {
return false, true
}

}
return false, false
}
Expand Down
44 changes: 44 additions & 0 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,50 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", Watching)
})

s.Run("advance watching channels released during check", func() {
idx := int64(19530)
mockAlloc := globalIDAllocator.NewMockGlobalIDAllocator(s.T())
mockAlloc.EXPECT().AllocOne().
RunAndReturn(func() (int64, error) {
idx++
return idx, nil
})
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)

m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", Watching)

// Release belfore check return
s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) {
if info.GetVchan().GetChannelName() == "ch1" {
m.Release(1, "ch1")
s.checkAssignment(m, 1, "ch1", ToRelease)
rwChannel, found := m.GetChannel(nodeID, "ch1")
s.True(found)
metaInfo := rwChannel.GetWatchInfo()
s.Require().EqualValues(metaInfo.GetOpID(), 19531)
log.Info("Trying to check this info", zap.Any("meta info", rwChannel.GetWatchInfo()))
}
log.Info("Trying to check this info", zap.Any("rpc info", info))
return &datapb.ChannelOperationProgressResponse{State: datapb.ChannelWatchState_WatchSuccess, Progress: 100}, nil
}).Twice()
m.AdvanceChannelState(ctx)

s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", Watched)
})

s.Run("advance watching channels check ErrNodeNotFound", func() {
chNodes := map[string]int64{
"ch1": 1,
Expand Down
21 changes: 10 additions & 11 deletions internal/datacoord/channel_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type RWChannelStore interface {
Update(op *ChannelOpSet) error

// UpdateState is used by StateChannelStore only
UpdateState(isSuccessful bool, channels ...RWChannel)
UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)
// SegLegacyChannelByNode is used by StateChannelStore only
SetLegacyChannelByNode(nodeIDs ...int64)

Expand Down Expand Up @@ -375,18 +375,17 @@ func (c *StateChannelStore) AddNode(nodeID int64) {
}
}

func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) {
lo.ForEach(channels, func(ch RWChannel, _ int) {
for _, cInfo := range c.channelsInfo {
if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok {
if isSuccessful {
stateChannel.(*StateChannel).TransitionOnSuccess()
} else {
stateChannel.(*StateChannel).TransitionOnFailure()
}
func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
channelName := channel.GetName()
if cInfo, ok := c.channelsInfo[nodeID]; ok {
if stateChannel, ok := cInfo.Channels[channelName]; ok {
if isSuccessful {
stateChannel.(*StateChannel).TransitionOnSuccess(opID)
} else {
stateChannel.(*StateChannel).TransitionOnFailure(opID)
}
}
})
}
}

func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) {
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/channel_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
},
}

store.UpdateState(test.inSuccess, channel)
store.UpdateState(test.inSuccess, bufferID, channel, 0)
s.Equal(test.outChannelState, channel.currentState)
})
}
Expand Down
34 changes: 11 additions & 23 deletions internal/datacoord/mock_channel_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 327d2c6

Please sign in to comment.