Skip to content

Commit c0b855d

Browse files
authored
fix: ChannelManager concurret Release and Watch bug (#38590)
See also: #38589 Signed-off-by: yangxuan <[email protected]>
1 parent 8fcb33c commit c0b855d

File tree

6 files changed

+115
-52
lines changed

6 files changed

+115
-52
lines changed

internal/datacoord/channel.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,15 @@ func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *St
191191
return c
192192
}
193193

194-
func (c *StateChannel) TransitionOnSuccess() {
194+
func (c *StateChannel) TransitionOnSuccess(opID int64) {
195+
if opID != c.Info.GetOpID() {
196+
log.Warn("Try to transit on success but opID not match, stay original state ",
197+
zap.Any("currentState", c.currentState),
198+
zap.String("channel", c.Name),
199+
zap.Int64("target opID", opID),
200+
zap.Int64("channel opID", c.Info.GetOpID()))
201+
return
202+
}
195203
switch c.currentState {
196204
case Standby:
197205
c.setState(ToWatch)
@@ -208,7 +216,15 @@ func (c *StateChannel) TransitionOnSuccess() {
208216
}
209217
}
210218

211-
func (c *StateChannel) TransitionOnFailure() {
219+
func (c *StateChannel) TransitionOnFailure(opID int64) {
220+
if opID != c.Info.GetOpID() {
221+
log.Warn("Try to transit on failure but opID not match, stay original state",
222+
zap.Any("currentState", c.currentState),
223+
zap.String("channel", c.Name),
224+
zap.Int64("target opID", opID),
225+
zap.Int64("channel opID", c.Info.GetOpID()))
226+
return
227+
}
212228
switch c.currentState {
213229
case Watching:
214230
c.setState(Standby)

internal/datacoord/channel_manager.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -546,8 +546,8 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
546546
nodeID := nodeAssign.NodeID
547547

548548
var (
549-
succeededChannels = make([]RWChannel, 0, channelCount)
550-
failedChannels = make([]RWChannel, 0, channelCount)
549+
succeededChannels = 0
550+
failedChannels = 0
551551
futures = make([]*conc.Future[any], 0, channelCount)
552552
)
553553

@@ -564,31 +564,42 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
564564

565565
future := getOrCreateIOPool().Submit(func() (any, error) {
566566
err := m.Notify(ctx, nodeID, tmpWatchInfo)
567-
return innerCh, err
567+
return poolResult{
568+
ch: innerCh,
569+
opID: tmpWatchInfo.GetOpID(),
570+
}, err
568571
})
569572
futures = append(futures, future)
570573
}
571574

572575
for _, f := range futures {
573-
ch, err := f.Await()
576+
got, err := f.Await()
577+
res := got.(poolResult)
578+
574579
if err != nil {
575-
failedChannels = append(failedChannels, ch.(RWChannel))
580+
log.Ctx(ctx).Warn("Failed to notify channel operations to datanode",
581+
zap.Int64("assignment", nodeAssign.NodeID),
582+
zap.Int("operation count", channelCount),
583+
zap.String("channel name", res.ch.GetName()),
584+
zap.Error(err),
585+
)
586+
failedChannels++
576587
} else {
577-
succeededChannels = append(succeededChannels, ch.(RWChannel))
588+
succeededChannels++
578589
advanced = true
579590
}
591+
592+
m.mu.Lock()
593+
m.store.UpdateState(err == nil, nodeID, res.ch, res.opID)
594+
m.mu.Unlock()
580595
}
581596

582597
log.Ctx(ctx).Info("Finish to notify channel operations to datanode",
583598
zap.Int64("assignment", nodeAssign.NodeID),
584599
zap.Int("operation count", channelCount),
585-
zap.Int("success count", len(succeededChannels)),
586-
zap.Int("failure count", len(failedChannels)),
600+
zap.Int("success count", succeededChannels),
601+
zap.Int("failure count", failedChannels),
587602
)
588-
m.mu.Lock()
589-
m.store.UpdateState(false, failedChannels...)
590-
m.store.UpdateState(true, succeededChannels...)
591-
m.mu.Unlock()
592603
}
593604

594605
return advanced
@@ -597,6 +608,7 @@ func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies [
597608
type poolResult struct {
598609
successful bool
599610
ch RWChannel
611+
opID int64
600612
}
601613

602614
func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
@@ -617,13 +629,15 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
617629

618630
for _, ch := range nodeAssign.Channels {
619631
innerCh := ch
632+
tmpWatchInfo := typeutil.Clone(innerCh.GetWatchInfo())
620633

621634
future := getOrCreateIOPool().Submit(func() (any, error) {
622-
successful, got := m.Check(ctx, nodeID, innerCh.GetWatchInfo())
635+
successful, got := m.Check(ctx, nodeID, tmpWatchInfo)
623636
if got {
624637
return poolResult{
625638
successful: successful,
626639
ch: innerCh,
640+
opID: tmpWatchInfo.GetOpID(),
627641
}, nil
628642
}
629643
return nil, errors.New("Got results with no progress")
@@ -636,7 +650,7 @@ func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*No
636650
if err == nil {
637651
m.mu.Lock()
638652
result := got.(poolResult)
639-
m.store.UpdateState(result.successful, result.ch)
653+
m.store.UpdateState(result.successful, nodeID, result.ch, result.opID)
640654
m.mu.Unlock()
641655

642656
advanced = true
@@ -656,6 +670,7 @@ func (m *ChannelManagerImpl) Notify(ctx context.Context, nodeID int64, info *dat
656670
zap.String("channel", info.GetVchan().GetChannelName()),
657671
zap.Int64("assignment", nodeID),
658672
zap.String("operation", info.GetState().String()),
673+
zap.Int64("opID", info.GetOpID()),
659674
)
660675
log.Info("Notify channel operation")
661676
err := m.subCluster.NotifyChannelOperation(ctx, nodeID, &datapb.ChannelOperationsRequest{Infos: []*datapb.ChannelWatchInfo{info}})
@@ -706,6 +721,7 @@ func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *data
706721
if resp.GetState() == datapb.ChannelWatchState_ReleaseFailure {
707722
return false, true
708723
}
724+
709725
}
710726
return false, false
711727
}

internal/datacoord/channel_manager_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,50 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
450450
s.checkAssignment(m, 1, "ch1", Watching)
451451
s.checkAssignment(m, 1, "ch2", Watching)
452452
})
453+
454+
s.Run("advance watching channels released during check", func() {
455+
idx := int64(19530)
456+
mockAlloc := globalIDAllocator.NewMockGlobalIDAllocator(s.T())
457+
mockAlloc.EXPECT().AllocOne().
458+
RunAndReturn(func() (int64, error) {
459+
idx++
460+
return idx, nil
461+
})
462+
chNodes := map[string]int64{
463+
"ch1": 1,
464+
"ch2": 1,
465+
}
466+
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
467+
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
468+
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, mockAlloc)
469+
s.Require().NoError(err)
470+
s.checkAssignment(m, 1, "ch1", ToWatch)
471+
s.checkAssignment(m, 1, "ch2", ToWatch)
472+
473+
m.AdvanceChannelState(ctx)
474+
s.checkAssignment(m, 1, "ch1", Watching)
475+
s.checkAssignment(m, 1, "ch2", Watching)
476+
477+
// Release belfore check return
478+
s.mockCluster.EXPECT().CheckChannelOperationProgress(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error) {
479+
if info.GetVchan().GetChannelName() == "ch1" {
480+
m.Release(1, "ch1")
481+
s.checkAssignment(m, 1, "ch1", ToRelease)
482+
rwChannel, found := m.GetChannel(nodeID, "ch1")
483+
s.True(found)
484+
metaInfo := rwChannel.GetWatchInfo()
485+
s.Require().EqualValues(metaInfo.GetOpID(), 19531)
486+
log.Info("Trying to check this info", zap.Any("meta info", rwChannel.GetWatchInfo()))
487+
}
488+
log.Info("Trying to check this info", zap.Any("rpc info", info))
489+
return &datapb.ChannelOperationProgressResponse{State: datapb.ChannelWatchState_WatchSuccess, Progress: 100}, nil
490+
}).Twice()
491+
m.AdvanceChannelState(ctx)
492+
493+
s.checkAssignment(m, 1, "ch1", ToRelease)
494+
s.checkAssignment(m, 1, "ch2", Watched)
495+
})
496+
453497
s.Run("advance watching channels check ErrNodeNotFound", func() {
454498
chNodes := map[string]int64{
455499
"ch1": 1,

internal/datacoord/channel_store.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type RWChannelStore interface {
7272
Update(op *ChannelOpSet) error
7373

7474
// UpdateState is used by StateChannelStore only
75-
UpdateState(isSuccessful bool, channels ...RWChannel)
75+
UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64)
7676
// SegLegacyChannelByNode is used by StateChannelStore only
7777
SetLegacyChannelByNode(nodeIDs ...int64)
7878

@@ -375,18 +375,17 @@ func (c *StateChannelStore) AddNode(nodeID int64) {
375375
}
376376
}
377377

378-
func (c *StateChannelStore) UpdateState(isSuccessful bool, channels ...RWChannel) {
379-
lo.ForEach(channels, func(ch RWChannel, _ int) {
380-
for _, cInfo := range c.channelsInfo {
381-
if stateChannel, ok := cInfo.Channels[ch.GetName()]; ok {
382-
if isSuccessful {
383-
stateChannel.(*StateChannel).TransitionOnSuccess()
384-
} else {
385-
stateChannel.(*StateChannel).TransitionOnFailure()
386-
}
378+
func (c *StateChannelStore) UpdateState(isSuccessful bool, nodeID int64, channel RWChannel, opID int64) {
379+
channelName := channel.GetName()
380+
if cInfo, ok := c.channelsInfo[nodeID]; ok {
381+
if stateChannel, ok := cInfo.Channels[channelName]; ok {
382+
if isSuccessful {
383+
stateChannel.(*StateChannel).TransitionOnSuccess(opID)
384+
} else {
385+
stateChannel.(*StateChannel).TransitionOnFailure(opID)
387386
}
388387
}
389-
})
388+
}
390389
}
391390

392391
func (c *StateChannelStore) SetLegacyChannelByNode(nodeIDs ...int64) {

internal/datacoord/channel_store_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,14 +436,14 @@ func (s *StateChannelStoreSuite) TestUpdateState() {
436436
ch := "ch-1"
437437
channel := NewStateChannel(getChannel(ch, 1))
438438
channel.setState(test.inChannelState)
439-
store.channelsInfo[1] = &NodeChannelInfo{
439+
store.channelsInfo[bufferID] = &NodeChannelInfo{
440440
NodeID: bufferID,
441441
Channels: map[string]RWChannel{
442442
ch: channel,
443443
},
444444
}
445445

446-
store.UpdateState(test.inSuccess, channel)
446+
store.UpdateState(test.inSuccess, bufferID, channel, 0)
447447
s.Equal(test.outChannelState, channel.currentState)
448448
})
449449
}

internal/datacoord/mock_channel_store.go

Lines changed: 11 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)