Skip to content

Commit

Permalink
cleam remote txn when stop cn (#21321)
Browse files Browse the repository at this point in the history
cleam remote txn when stop cn

Approved by: @zhangxu19830126, @sukki37
  • Loading branch information
iamlinjunhong authored Jan 22, 2025
1 parent 42aee9e commit 53a58c3
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/lockservice/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
defaultKeepRemoteLockDuration = time.Second
defaultKeepBindTimeout = time.Second * 10
defaultRemoteLockTimeout = time.Minute * 10
defaultRemoteTxnTimeout = time.Second * 10
)

// Config lock service config
Expand Down
28 changes: 28 additions & 0 deletions pkg/lockservice/service_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,10 @@ func (s *service) unlockTimeoutRemoteTxn(ctx context.Context) {
timer := time.NewTimer(wait)
defer timer.Stop()

timeout := defaultRemoteTxnTimeout
txnTimer := time.NewTimer(timeout)
defer txnTimer.Stop()

var timeoutTxns [][]byte
timeoutServices := make(map[string]struct{})
for {
Expand All @@ -497,6 +501,30 @@ func (s *service) unlockTimeoutRemoteTxn(ctx context.Context) {
}

timer.Reset(wait)
case <-txnTimer.C:
s.checkRemoteTxnTimeout(ctx)
txnTimer.Reset(timeout)
}
}
}

func (s *service) checkRemoteTxnTimeout(ctx context.Context) {
if s.isStatus(pb.Status_ServiceLockEnable) {
return
}
txns := s.activeTxnHolder.getAllTxnID()
for _, t := range txns {
txn := s.activeTxnHolder.getActiveTxn(t, false, "")
createOn := txn.remoteService
if len(createOn) == 0 {
continue
}

waitTxn := pb.WaitTxn{TxnID: t, CreatedOn: createOn}
if !s.activeTxnHolder.isValidRemoteTxn(waitTxn) {
s.logger.Warn("found timeout txn",
bytesArrayField("txn", [][]byte{t}))
_ = s.Unlock(ctx, t, timestamp.Timestamp{})
}
}
}
Expand Down
71 changes: 71 additions & 0 deletions pkg/lockservice/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2269,6 +2269,77 @@ func TestReLockSuccWithReStartCN(t *testing.T) {
)
}

func TestCheckRemoteTxnTimeout(t *testing.T) {
runLockServiceTestsWithLevel(
t,
zapcore.DebugLevel,
[]string{"s1", "s2"},
time.Second*1,
func(alloc *lockTableAllocator, s []*service) {
l1 := s[0]
l2 := s[1]

ctx, cancel := context.WithTimeout(
context.Background(),
time.Second*10)
defer cancel()
option := pb.LockOptions{
Granularity: pb.Granularity_Row,
Mode: pb.LockMode_Exclusive,
Policy: pb.WaitPolicy_Wait,
}

// table 0 in s2
_, err := l2.Lock(
ctx,
0,
[][]byte{{1}},
[]byte("txn1"),
option)
require.NoError(t, err)
require.NoError(t, l2.Unlock(ctx, []byte("txn1"), timestamp.Timestamp{}))

// table 1 in s1
_, err = l1.Lock(
ctx,
1,
[][]byte{{1}},
[]byte("txn2"),
option)
require.NoError(t, err)
_, err = l1.Lock(
ctx,
0,
[][]byte{{1}},
[]byte("txn2"),
option)
require.NoError(t, err)
l1.tableGroups.removeWithFilter(func(_ uint64, v lockTable) bool {
return true
})
require.NoError(t, l1.Unlock(ctx, []byte("txn2"), timestamp.Timestamp{}))

require.False(t, l2.activeTxnHolder.empty())

alloc.setRestartService("s2")
for {
if l2.isStatus(pb.Status_ServiceLockWaiting) {
break
}
select {
case <-ctx.Done():
panic("timeout bug")
default:
}
}

l2.checkRemoteTxnTimeout(ctx)
require.True(t, l2.activeTxnHolder.empty())
},
nil,
)
}

func TestReLockSuccWithKeepBindTimeout(t *testing.T) {
runLockServiceTestsWithLevel(
t,
Expand Down

0 comments on commit 53a58c3

Please sign in to comment.