Skip to content

Commit

Permalink
enhance(metanode):snapshot. version message from master need raft sub…
Browse files Browse the repository at this point in the history
…mit and quened

Signed-off-by: leonrayang <[email protected]>
  • Loading branch information
leonrayang committed Sep 11, 2023
1 parent 47842d5 commit 0e4a8ad
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 31 deletions.
1 change: 1 addition & 0 deletions metanode/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type metadataManager struct {
cpuUtil atomicutil.Float64
samplerDone chan struct{}
volUpdating *sync.Map //map[string]*verOp2Phase
verUpdateChan chan []byte
}

func (m *metadataManager) getPacketLabels(p *Packet) (labels map[string]string) {
Expand Down
4 changes: 2 additions & 2 deletions metanode/manager_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -2312,8 +2312,8 @@ func (m *metadataManager) commitCreateVersion(VolumeID string, VerSeq uint64, Op
if _, ok := partition.IsLeader(); !ok {
return true
}
log.LogInfof("action[commitCreateVersion] volume %v mp %v do MultiVersionOp verseq %v", VolumeID, id, VerSeq)
if err = partition.MultiVersionOp(Op, VerSeq, nil); err != nil {
log.LogInfof("action[commitCreateVersion] volume %v mp %v do HandleVersionOp verseq %v", VolumeID, id, VerSeq)
if err = partition.HandleVersionOp(Op, VerSeq, nil); err != nil {
return false
}
return true
Expand Down
10 changes: 9 additions & 1 deletion metanode/multi_ver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ func TestCheckVerList(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mp = mockPartitionRaftForTest(mockCtrl)

mp.verUpdateChan = make(chan []byte, 100)
mp.multiVersionList = &proto.VolVersionInfoList{
VerList: []*proto.VolVersionInfo{
{Ver: 20, Status: proto.VersionNormal},
Expand All @@ -1239,6 +1239,9 @@ func TestCheckVerList(t *testing.T) {
}

mp.checkVerList(masterList)
verData := <-mp.verUpdateChan
mp.submit(opFSMVersionOp, verData)

assert.True(t, mp.verSeq == 50)
assert.True(t, mp.multiVersionList.VerList[len(mp.multiVersionList.VerList)-1].Ver == 50)

Expand All @@ -1247,7 +1250,12 @@ func TestCheckVerList(t *testing.T) {
{Ver: 20, Status: proto.VersionNormal},
{Ver: 40, Status: proto.VersionNormal}},
}

mp.checkVerList(masterList)
verData = <-mp.verUpdateChan
mp.submit(opFSMVersionOp, verData)

assert.True(t, mp.verSeq == 40)
assert.True(t, len(mp.multiVersionList.VerList) == 2)
mp.stop()
}
58 changes: 35 additions & 23 deletions metanode/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ type OpMultipart interface {

// MultiVersion operation from master or client
type OpMultiVersion interface {
MultiVersionOp(op uint8, verSeq uint64, verList []*proto.VolVersionInfo) (err error)
HandleVersionOp(op uint8, verSeq uint64, verList []*proto.VolVersionInfo) (err error)
fsmVersionOp(reqData []byte) (err error)
GetAllVersionInfo(req *proto.MultiVersionOpRequest, p *Packet) (err error)
GetSpecVersionInfo(req *proto.MultiVersionOpRequest, p *Packet) (err error)
Expand Down Expand Up @@ -504,6 +504,7 @@ type metaPartition struct {
verSeq uint64
multiVersionList *proto.VolVersionInfoList
versionLock sync.Mutex
verUpdateChan chan []byte
}

func (mp *metaPartition) IsForbidden() bool {
Expand Down Expand Up @@ -649,6 +650,38 @@ func (mp *metaPartition) Stop() {
}
}

func (mp *metaPartition) versionInit(isCreate bool) (err error) {
var verList *proto.VolVersionInfoList
verList, err = masterClient.AdminAPI().GetVerList(mp.config.VolName)

if err != nil {
log.LogErrorf("action[onStart] GetVerList err[%v]", err)
return
}
if isCreate || len(mp.multiVersionList.VerList) == 0 {
for _, info := range verList.VerList {
if info.Status != proto.VersionNormal {
continue
}
mp.multiVersionList.VerList = append(mp.multiVersionList.VerList, info)
}

log.LogDebugf("action[onStart] verList %v", mp.multiVersionList.VerList)
if err = mp.storeInitMultiversion(); err != nil {
return
}
}

vlen := len(mp.multiVersionList.VerList)
if vlen > 0 {
mp.verSeq = mp.multiVersionList.VerList[vlen-1].Ver
}

go mp.runVersionOp()

return
}

func (mp *metaPartition) onStart(isCreate bool) (err error) {
defer func() {
if err == nil {
Expand Down Expand Up @@ -677,38 +710,17 @@ func (mp *metaPartition) onStart(isCreate bool) (err error) {

var (
volumeInfo *proto.SimpleVolView
verList *proto.VolVersionInfoList
)
if volumeInfo, err = masterClient.AdminAPI().GetVolumeSimpleInfo(mp.config.VolName); err != nil {
log.LogErrorf("action[onStart] GetVolumeSimpleInfo err[%v]", err)
return
}

mp.vol.volDeleteLockTime = volumeInfo.DeleteLockTime
verList, err = masterClient.AdminAPI().GetVerList(mp.config.VolName)

if err != nil {
log.LogErrorf("action[onStart] GetVerList err[%v]", err)
if err = mp.versionInit(isCreate); err != nil {
return
}
if isCreate || len(mp.multiVersionList.VerList) == 0 {
for _, info := range verList.VerList {
if info.Status != proto.VersionNormal {
continue
}
mp.multiVersionList.VerList = append(mp.multiVersionList.VerList, info)
}

log.LogDebugf("action[onStart] verList %v", mp.multiVersionList.VerList)
if err = mp.storeInitMultiversion(); err != nil {
return
}
}

vlen := len(mp.multiVersionList.VerList)
if vlen > 0 {
mp.verSeq = mp.multiVersionList.VerList[vlen-1].Ver
}

mp.volType = volumeInfo.VolType
var ebsClient *blobstore.BlobStoreClient
Expand Down
15 changes: 14 additions & 1 deletion metanode/partition_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,19 @@ func (mp *metaPartition) Apply(command []byte, index uint64) (resp interface{},
return
}

func (mp *metaPartition) runVersionOp() {
mp.verUpdateChan = make(chan []byte, 100)
for {
select {
case verData := <-mp.verUpdateChan:
mp.submit(opFSMVersionOp, verData)
case <-mp.stopC:
log.LogWarnf("runVersionOp exit!")
return
}
}
}

func (mp *metaPartition) fsmVersionOp(reqData []byte) (err error) {
mp.multiVersionList.Lock()
defer mp.multiVersionList.Unlock()
Expand All @@ -462,7 +475,7 @@ func (mp *metaPartition) fsmVersionOp(reqData []byte) (err error) {
if opData.Op == proto.CreateVersionCommit {
cnt := len(mp.multiVersionList.VerList)
if cnt > 0 && mp.multiVersionList.VerList[cnt-1].Ver >= opData.VerSeq {
log.LogErrorf("action[MultiVersionOp] reqeust seq %v lessOrEqual last exist snapshot seq %v",
log.LogErrorf("action[HandleVersionOp] reqeust seq %v lessOrEqual last exist snapshot seq %v",
mp.multiVersionList.VerList[cnt-1].Ver, opData.VerSeq)
return
}
Expand Down
13 changes: 9 additions & 4 deletions metanode/partition_op_extent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package metanode
import (
"encoding/json"
"fmt"
"github.com/cubefs/cubefs/util/exporter"
"os"
"sort"

"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/util/errors"
"github.com/cubefs/cubefs/util/exporter"
"github.com/cubefs/cubefs/util/log"
)

Expand Down Expand Up @@ -247,23 +247,28 @@ func (mp *metaPartition) checkVerList(masterListInfo *proto.VolVersionInfoList)
lastSeq = VerList[i].Ver
return false
})
if err = mp.MultiVersionOp(proto.SyncAllVersionList, lastSeq, VerList); err != nil {
if err = mp.HandleVersionOp(proto.SyncAllVersionList, lastSeq, VerList); err != nil {
return
}
}
return
}

func (mp *metaPartition) MultiVersionOp(op uint8, verSeq uint64, verList []*proto.VolVersionInfo) (err error) {
func (mp *metaPartition) HandleVersionOp(op uint8, verSeq uint64, verList []*proto.VolVersionInfo) (err error) {

verData := &VerOpData{
Op: op,
VerSeq: verSeq,
VerList: verList,
}
data, _ := json.Marshal(verData)
_, err = mp.submit(opFSMVersionOp, data)

select {
case mp.verUpdateChan <- data:
log.LogDebugf("mp %v verSeq %v op %v be pushed to queue", mp.config.PartitionId, verSeq, op)
default:
err = fmt.Errorf("mp %v version update channel full, verdata %v not be executed", mp.config.PartitionId, string(data))
}
return
}

Expand Down
1 change: 1 addition & 0 deletions repl/repl_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (rp *ReplProtocol) readPkgAndPrepare() (err error) {
}
//log.LogDebugf("action[readPkgAndPrepare] packet(%v) op %v from remote(%v) conn(%v) ",
// request.GetUniqueLogId(), request.Opcode, rp.sourceConn.RemoteAddr().String(), rp.sourceConn)

if err = request.resolveFollowersAddr(); err != nil {
err = rp.putResponse(request)
return
Expand Down

0 comments on commit 0e4a8ad

Please sign in to comment.