Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: State trans error in concurrent Release and Watching #38591

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions internal/datacoord/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,15 @@
return c
}

func (c *StateChannel) TransitionOnSuccess() {
func (c *StateChannel) TransitionOnSuccess(opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on success but opID not match, stay original state ",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}
switch c.currentState {
case Standby:
c.setState(ToWatch)
Expand All @@ -208,7 +216,15 @@
}
}

func (c *StateChannel) TransitionOnFailure() {
func (c *StateChannel) TransitionOnFailure(opID int64) {
if opID != c.Info.GetOpID() {
log.Warn("Try to transit on failure but opID not match, stay original state",
zap.Any("currentState", c.currentState),
zap.String("channel", c.Name),
zap.Int64("target opID", opID),
zap.Int64("channel opID", c.Info.GetOpID()))
return
}

Check warning on line 227 in internal/datacoord/channel.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/channel.go#L221-L227

Added lines #L221 - L227 were not covered by tests
switch c.currentState {
case Watching:
c.setState(Standby)
Expand Down
43 changes: 28 additions & 15 deletions internal/datacoord/channel_manager_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,9 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies
continue
}
nodeID := nodeAssign.NodeID

var (
succeededChannels = make([]RWChannel, 0, channelCount)
failedChannels = make([]RWChannel, 0, channelCount)
succeededChannels = 0
failedChannels = 0
futures = make([]*conc.Future[any], 0, channelCount)
)

Expand All @@ -530,31 +529,42 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies

future := getOrCreateIOPool().Submit(func() (any, error) {
err := m.Notify(ctx, nodeID, tmpWatchInfo)
return innerCh, err
return poolResult{
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
}, err
})
futures = append(futures, future)
}

for _, f := range futures {
ch, err := f.Await()
got, err := f.Await()
res := got.(poolResult)

if err != nil {
failedChannels = append(failedChannels, ch.(RWChannel))
log.Ctx(ctx).Warn("Failed to notify channel operations to datanode",
zap.Int64("assignment", nodeAssign.NodeID),
zap.Int("operation count", channelCount),
zap.String("channel name", res.ch.GetName()),
zap.Error(err),
)
failedChannels++
} else {
succeededChannels = append(succeededChannels, ch.(RWChannel))
succeededChannels++
advanced = true
}

m.mu.Lock()
m.store.UpdateState(err == nil, nodeID, res.ch, res.opID)
m.mu.Unlock()
}

log.Info("Finish to notify channel operations to datanode",
zap.Int64("assignment", nodeAssign.NodeID),
zap.Int("operation count", channelCount),
zap.Int("success count", len(succeededChannels)),
zap.Int("failure count", len(failedChannels)),
zap.Int("success count", succeededChannels),
zap.Int("failure count", failedChannels),
)
m.mu.Lock()
m.store.UpdateState(false, failedChannels...)
m.store.UpdateState(true, succeededChannels...)
m.mu.Unlock()
}

return advanced
Expand All @@ -563,6 +573,7 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies
type poolResult struct {
successful bool
ch RWChannel
opID int64
}

func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
Expand All @@ -583,13 +594,15 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*

for _, ch := range nodeAssign.Channels {
innerCh := ch
tmpWatchInfo := proto.Clone(innerCh.GetWatchInfo()).(*datapb.ChannelWatchInfo)

future := getOrCreateIOPool().Submit(func() (any, error) {
successful, got := m.Check(ctx, nodeID, innerCh.GetWatchInfo())
successful, got := m.Check(ctx, nodeID, tmpWatchInfo)
if got {
return poolResult{
successful: successful,
ch: innerCh,
opID: tmpWatchInfo.GetOpID(),
}, nil
}
return nil, errors.New("Got results with no progress")
Expand All @@ -602,7 +615,7 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*
if err == nil {
m.mu.Lock()
result := got.(poolResult)
m.store.UpdateState(result.successful, result.ch)
m.store.UpdateState(result.successful, nodeID, result.ch, result.opID)
m.mu.Unlock()

advanced = true
Expand Down
42 changes: 42 additions & 0 deletions internal/datacoord/channel_manager_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,48 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", Watching)
})
s.Run("advance watching channels released during check", func() {
idx := int64(19530)
mockAlloc := globalIDAllocator.NewMockGlobalIDAllocator(s.T())
mockAlloc.EXPECT().AllocOne().
RunAndReturn(func() (int64, error) {
idx++
return idx, nil
})
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)

m.AdvanceChannelState(ctx)
s.checkAssignment(m, 1, "ch1", Watching)
s.checkAssignment(m, 1, "ch2", Watching)

// Release belfore check return
s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) {
if info.GetVchan().GetChannelName() == "ch1" {
m.Release(1, "ch1")
s.checkAssignment(m, 1, "ch1", ToRelease)
rwChannel, found := m.GetChannel(nodeID, "ch1")
s.True(found)
metaInfo := rwChannel.GetWatchInfo()
s.Require().EqualValues(metaInfo.GetOpID(), 19531)
log.Info("Trying to check this info", zap.Any("meta info", rwChannel.GetWatchInfo()))
}
log.Info("Trying to check this info", zap.Any("rpc info", info))
return &datapb.ChannelOperationProgressResponse{State: datapb.ChannelWatchState_WatchSuccess, Progress: 100}, nil
}).Twice()
m.AdvanceChannelState(ctx)

s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", Watched)
})
s.Run("advance watching channels check ErrNodeNotFound", func() {
chNodes := map[string]int64{
"ch1": 1,
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/channel_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
Update(op *ChannelOpSet) error

// UpdateState is used by StateChannelStore only
UpdateState(isSuccessful bool, channels ...RWChannel)
UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)
// SegLegacyChannelByNode is used by StateChannelStore only
SetLegacyChannelByNode(nodeIDs ...int64)
}
Expand Down Expand Up @@ -565,7 +565,7 @@
return nil
}

func (c *ChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) {
func (c *ChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {

Check warning on line 568 in internal/datacoord/channel_store.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/channel_store.go#L568

Added line #L568 was not covered by tests
log.Error("ChannelStore doesn't implement UpdateState")
}

Expand Down
19 changes: 9 additions & 10 deletions internal/datacoord/channel_store_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,17 @@ func (c *StateChannelStore) AddNode(nodeID int64) {
}
}

func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) {
lo.ForEach(channels, func(ch RWChannel, _ int) {
for _, cInfo := range c.channelsInfo {
if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok {
if isSuccessful {
stateChannel.(*StateChannel).TransitionOnSuccess()
} else {
stateChannel.(*StateChannel).TransitionOnFailure()
}
func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
channelName := channel.GetName()
if cInfo, ok := c.channelsInfo[nodeID]; ok {
if stateChannel, ok := cInfo.Channels[channelName]; ok {
if isSuccessful {
stateChannel.(*StateChannel).TransitionOnSuccess(opID)
} else {
stateChannel.(*StateChannel).TransitionOnFailure(opID)
}
}
})
}
}

func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/channel_store_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,14 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
ch := "ch-1"
channel := NewStateChannel(getChannel(ch, 1))
channel.setState(test.inChannelState)
store.channelsInfo[1] = &NodeChannelInfo{
store.channelsInfo[bufferID] = &NodeChannelInfo{
NodeID: bufferID,
Channels: map[string]RWChannel{
ch: channel,
},
}

store.UpdateState(test.inSuccess, channel)
store.UpdateState(test.inSuccess, bufferID, channel, 0)
s.Equal(test.outChannelState, channel.currentState)
})
}
Expand Down
34 changes: 11 additions & 23 deletions internal/datacoord/mock_channel_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading