Skip to content

Commit

Permalink
refactor(lcnode): refactor some code about lcnode task manager
Browse files Browse the repository at this point in the history
Signed-off-by: zhaochenyang <[email protected]>
  • Loading branch information
honeyvinnie authored and tangdeyi committed Aug 31, 2023
1 parent b24525a commit 95c3c12
Show file tree
Hide file tree
Showing 11 changed files with 365 additions and 371 deletions.
6 changes: 2 additions & 4 deletions lcnode/lc_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ func (l *LcNode) opMasterHeartbeat(conn net.Conn, p *proto.Packet, remoteAddr st
info := &proto.SnapshotVerDelTaskResponse{
ID: scanner.ID,
SnapshotStatistics: proto.SnapshotStatistics{
VerInfo: proto.VerInfo{
VolName: scanner.Volume,
VerSeq: scanner.getTaskVerSeq(),
},
VolName: scanner.Volume,
VerSeq: scanner.getTaskVerSeq(),
TotalInodeNum: atomic.LoadInt64(&scanner.currentStat.TotalInodeNum),
FileNum: atomic.LoadInt64(&scanner.currentStat.FileNum),
DirNum: atomic.LoadInt64(&scanner.currentStat.DirNum),
Expand Down
48 changes: 24 additions & 24 deletions lcnode/snapshot_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewSnapshotScanner(adminTask *proto.AdminTask, l *LcNode) (*SnapshotScanner
}

scanner := &SnapshotScanner{
ID: request.Task.Key(),
ID: request.Task.Id,
Volume: request.Task.VolName,
mw: metaWrapper,
lcnode: l,
Expand All @@ -83,7 +83,7 @@ func (l *LcNode) startSnapshotScan(adminTask *proto.AdminTask) (err error) {
adminTask.Response = response

l.scannerMutex.Lock()
if _, ok := l.snapshotScanners[request.Task.Key()]; ok {
if _, ok := l.snapshotScanners[request.Task.Id]; ok {
log.LogInfof("startSnapshotScan: scan task(%v) is already running!", request.Task)
l.scannerMutex.Unlock()
return
Expand All @@ -98,15 +98,15 @@ func (l *LcNode) startSnapshotScan(adminTask *proto.AdminTask) (err error) {
l.scannerMutex.Unlock()
return
}
l.snapshotScanners[request.Task.Key()] = scanner
l.snapshotScanners[scanner.ID] = scanner
l.scannerMutex.Unlock()

go scanner.Start()
return
}

func (s *SnapshotScanner) getTaskVerSeq() uint64 {
return s.verDelReq.Task.VerSeq
return s.verDelReq.Task.VolVersionInfo.Ver
}

func (s *SnapshotScanner) Stop() {
Expand Down Expand Up @@ -203,28 +203,28 @@ func (s *SnapshotScanner) handlVerDelDepthFirst(dentry *proto.ScanDentry) {
done := false

for !done {
children, err = s.mw.ReadDirLimitByVer(dentry.Inode, marker, uint64(snapshotRoutineNumPerTask), s.getTaskVerSeq(), onlyDir)
children, err = s.mw.ReadDirLimitByVer(dentry.Inode, marker, uint64(defaultReadDirLimit), s.getTaskVerSeq(), onlyDir)
if err != nil && err != syscall.ENOENT {
log.LogErrorf("action[handlVerDelDepthFirst] ReadDirLimitByVer failed, parent[%v] maker[%v] limit[%v] verSeq[%v] err[%v]",
dentry.Inode, marker, uint64(snapshotRoutineNumPerTask), s.getTaskVerSeq(), err)
log.LogErrorf("action[handlVerDelDepthFirst] ReadDirLimitByVer failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
dentry.Inode, marker, s.getTaskVerSeq(), err)
return
}
log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimitByVer parent[%v] maker[%v] limit[%v] verSeq[%v] children[%v]",
dentry.Inode, marker, uint64(snapshotRoutineNumPerTask), s.getTaskVerSeq(), len(children))
log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimitByVer parent[%v] maker[%v] verSeq[%v] children[%v]",
dentry.Inode, marker, s.getTaskVerSeq(), len(children))

if err == syscall.ENOENT {
done = true
log.LogErrorf("action[handlVerDelDepthFirst] ReadDirLimitByVer failed, parent[%v] maker[%v] limit[%v] verSeq[%v] err[%v]",
dentry.Inode, marker, uint64(snapshotRoutineNumPerTask), s.getTaskVerSeq(), err)
log.LogErrorf("action[handlVerDelDepthFirst] ReadDirLimitByVer failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
dentry.Inode, marker, s.getTaskVerSeq(), err)
break
}

if marker != "" {
if len(children) >= 1 && marker == children[0].Name {
if len(children) <= 1 {
done = true
log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll done, parent[%v] maker[%v] limit[%v] verSeq[%v] children[%v]",
dentry.Inode, marker, uint64(snapshotRoutineNumPerTask), s.getTaskVerSeq(), children)
log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll done, parent[%v] maker[%v] verSeq[%v] children[%v]",
dentry.Inode, marker, s.getTaskVerSeq(), children)
break
} else {
skippedChild := children[0]
Expand Down Expand Up @@ -270,7 +270,7 @@ func (s *SnapshotScanner) handlVerDelDepthFirst(dentry *proto.ScanDentry) {
}

childrenNr := len(children)
if (marker == "" && childrenNr < snapshotRoutineNumPerTask) || (marker != "" && childrenNr+1 < snapshotRoutineNumPerTask) {
if (marker == "" && childrenNr < defaultReadDirLimit) || (marker != "" && childrenNr+1 < defaultReadDirLimit) {
log.LogDebugf("action[handlVerDelDepthFirst] ReadDirLimit_ll done, parent[%v]",
dentry.Inode)
done = true
Expand Down Expand Up @@ -317,28 +317,28 @@ func (s *SnapshotScanner) handlVerDelBreadthFirst(dentry *proto.ScanDentry) {
done := false

for !done {
children, err = s.mw.ReadDirLimitByVer(dentry.Inode, marker, uint64(snapshotRoutineNumPerTask), s.getTaskVerSeq(), false)
children, err = s.mw.ReadDirLimitByVer(dentry.Inode, marker, uint64(defaultReadDirLimit), s.getTaskVerSeq(), false)
if err != nil && err != syscall.ENOENT {
log.LogErrorf("action[handlVerDelBreadthFirst] ReadDirLimitByVer failed, parent[%v] maker[%v] limit[%v] verSeq[%v] err[%v]",
dentry.Inode, marker, uint64(snapshotRoutineNumPerTask), s.getTaskVerSeq(), err)
log.LogErrorf("action[handlVerDelBreadthFirst] ReadDirLimitByVer failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
dentry.Inode, marker, s.getTaskVerSeq(), err)
return
}
log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimitByVer parent[%v] maker[%v] limit[%v] verSeq[%v] children[%v]",
dentry.Inode, marker, uint64(snapshotRoutineNumPerTask), s.getTaskVerSeq(), len(children))
log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimitByVer parent[%v] maker[%v] verSeq[%v] children[%v]",
dentry.Inode, marker, s.getTaskVerSeq(), len(children))

if err == syscall.ENOENT {
done = true
log.LogErrorf("action[handlVerDelBreadthFirst] ReadDirLimitByVer failed, parent[%v] maker[%v] limit[%v] verSeq[%v] err[%v]",
dentry.Inode, marker, uint64(snapshotRoutineNumPerTask), s.getTaskVerSeq(), err)
log.LogErrorf("action[handlVerDelBreadthFirst] ReadDirLimitByVer failed, parent[%v] maker[%v] verSeq[%v] err[%v]",
dentry.Inode, marker, s.getTaskVerSeq(), err)
break
}

if marker != "" {
if len(children) >= 1 && marker == children[0].Name {
if len(children) <= 1 {
done = true
log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll done, parent[%v] maker[%v] limit[%v] verSeq[%v] children[%v]",
dentry.Inode, marker, uint64(snapshotRoutineNumPerTask), s.getTaskVerSeq(), children)
log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll done, parent[%v] maker[%v] verSeq[%v] children[%v]",
dentry.Inode, marker, s.getTaskVerSeq(), children)
break
} else {
skippedChild := children[0]
Expand Down Expand Up @@ -382,7 +382,7 @@ func (s *SnapshotScanner) handlVerDelBreadthFirst(dentry *proto.ScanDentry) {
}
scanDentries = scanDentries[:0]
childrenNr := len(children)
if (marker == "" && childrenNr < snapshotRoutineNumPerTask) || (marker != "" && childrenNr+1 < snapshotRoutineNumPerTask) {
if (marker == "" && childrenNr < defaultReadDirLimit) || (marker != "" && childrenNr+1 < defaultReadDirLimit) {
log.LogDebugf("action[handlVerDelBreadthFirst] ReadDirLimit_ll done, parent[%v] total childrenNr[%v] marker[%v]",
dentry.Inode, totalChildFileNum+totalChildDirNum, marker)
done = true
Expand Down
5 changes: 2 additions & 3 deletions lcnode/snapshot_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ func TestSnapshotScanner(t *testing.T) {
},
verDelReq: &proto.SnapshotVerDelTaskRequest{
Task: &proto.SnapshotVerDelTask{
VerInfo: proto.VerInfo{
VolName: "test_vol",
VerSeq: 1,
VolVersionInfo: &proto.VolVersionInfo{
Ver: 0,
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion master/admin_task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (sender *AdminTaskManager) DelTask(t *proto.AdminTask) {
if !ok {
return
}
if t.OpCode != proto.OpMetaNodeHeartbeat && t.OpCode != proto.OpDataNodeHeartbeat {
if t.OpCode != proto.OpMetaNodeHeartbeat && t.OpCode != proto.OpDataNodeHeartbeat && t.OpCode != proto.OpLcNodeHeartbeat {
log.LogDebugf("action[DelTask] delete task[%v]", t.ToString())
}
delete(sender.TaskMap, t.ID)
Expand Down
49 changes: 26 additions & 23 deletions master/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4207,16 +4207,24 @@ errHandler:
return
}

type LcNodeStatInfo struct {
Addr string
TaskId string
}

type LcNodeInfoResponse struct {
Infos []*LcNodeStatInfo
LcConfigurations map[string]*proto.LcConfiguration
LcRuleTaskStatus *lcRuleTaskStatus
LcSnapshotVerStatus *lcSnapshotVerStatus
}

func (c *Cluster) getAllLcNodeInfo() (rsp *LcNodeInfoResponse, err error) {
rsp = &LcNodeInfoResponse{
Infos: make([]*LcNodeStatInfo, 0),
}
c.lcNodes.Range(func(addr, value interface{}) bool {
t := c.lcMgr.lcNodeStatus.getWorkingTask(addr.(string))
var TaskId string
if t != nil {
TaskId = t.(*proto.RuleTask).Id
}
TaskId := c.lcMgr.lcNodeStatus.workingNodes[addr.(string)]
ln := &LcNodeStatInfo{
Addr: addr.(string),
TaskId: TaskId,
Expand All @@ -4226,7 +4234,7 @@ func (c *Cluster) getAllLcNodeInfo() (rsp *LcNodeInfoResponse, err error) {
})
rsp.LcConfigurations = c.lcMgr.lcConfigurations
rsp.LcRuleTaskStatus = c.lcMgr.lcRuleTaskStatus
rsp.SnapshotInfos = c.snapshotMgr.volVerInfos
rsp.LcSnapshotVerStatus = c.snapshotMgr.lcSnapshotTaskStatus
return
}

Expand All @@ -4240,14 +4248,9 @@ func (c *Cluster) clearLcNodes() {
}

func (c *Cluster) delLcNode(nodeAddr string) (err error) {
t := c.lcMgr.lcNodeStatus.removeNode(nodeAddr)
if t != nil {
c.lcMgr.lcRuleTaskStatus.redoTask(t.(*proto.RuleTask))
}
t = c.snapshotMgr.lcNodeStatus.removeNode(nodeAddr)
if t != nil {
c.snapshotMgr.volVerInfos.RedoProcessingVerInfo(t.(string))
}
c.lcMgr.lcRuleTaskStatus.RedoTask(c.lcMgr.lcNodeStatus.RemoveNode(nodeAddr))
c.snapshotMgr.lcSnapshotTaskStatus.RedoTask(c.snapshotMgr.lcNodeStatus.RemoveNode(nodeAddr))

lcNode, err := c.lcNode(nodeAddr)
if err != nil {
log.LogErrorf("action[delLcNode], clusterID:%v, lcNodeAddr:%v, load err:%v ", c.Name, nodeAddr, err)
Expand Down Expand Up @@ -4312,7 +4315,7 @@ func (c *Cluster) scheduleToSnapshotDelVerScan() {

func (c *Cluster) getSnapshotDelVer() {
if c.partition == nil || !c.partition.IsRaftLeader() {
log.LogWarn("getDeletingSnapshotVer: master is not leader")
log.LogWarn("getSnapshotDelVer: master is not leader")
return
}

Expand All @@ -4321,18 +4324,18 @@ func (c *Cluster) getSnapshotDelVer() {
volVerInfoList := vol.VersionMgr.getVersionList()
for _, volVerInfo := range volVerInfoList.VerList {
if volVerInfo.Status == proto.VersionDeleting {
verInfo := &LcVerInfo{
VerInfo: proto.VerInfo{
VolName: volName,
VerSeq: volVerInfo.Ver,
},
dTime: time.UnixMicro(volVerInfo.DelTime),
task := &proto.SnapshotVerDelTask{
Id: fmt.Sprintf("%s:%d", volName, volVerInfo.Ver),
VolName: volName,
VolVersionInfo: volVerInfo,
}
c.snapshotMgr.volVerInfos.AddVerInfo(verInfo)
c.snapshotMgr.lcSnapshotTaskStatus.AddVerInfo(task)
}
}
}
log.LogDebug("getDeletingSnapshotVer finish")
log.LogDebug("getSnapshotDelVer AddVerInfo finish")
c.snapshotMgr.lcSnapshotTaskStatus.DeleteOldResult()
log.LogDebug("getSnapshotDelVer DeleteOldResult finish")
}

func (c *Cluster) checkSnapshotStrategy() {
Expand Down
Loading

0 comments on commit 95c3c12

Please sign in to comment.