Skip to content

Commit 8f58bda

Browse files
askyrieSongZhen0704
authored andcommitted
fix: genesis sync update storage
1 parent 797324b commit 8f58bda

File tree

6 files changed

+38
-16
lines changed

6 files changed

+38
-16
lines changed

server/controller/genesis/common/type.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,12 @@ type K8SRPCMessage struct {
7272
}
7373

7474
type VIFRPCMessage struct {
75-
ORGID int
76-
MessageType int
77-
TeamID uint32
78-
VtapID uint32
79-
Peer string
80-
K8SClusterID string
81-
Message *agent.GenesisSyncRequest
75+
ORGID int
76+
MessageType int
77+
TeamID uint32
78+
VtapID uint32
79+
Peer string
80+
K8SClusterID string
81+
StorageRefresh bool
82+
Message *agent.GenesisSyncRequest
8283
}

server/controller/genesis/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package config
1919
type GenesisConfig struct {
2020
AgingTime float64 `default:"86400" yaml:"aging_time"`
2121
VinterfaceAgingTime float64 `default:"300" yaml:"vinterface_aging_time"`
22+
AgentHeartBeat float64 `default:"60" yaml:"agent_heart_beat"`
2223
HostIPs []string `yaml:"host_ips"`
2324
LocalIPRanges []string `yaml:"local_ip_ranges"`
2425
ExcludeIPRanges []string `yaml:"exclude_ip_ranges"`

server/controller/genesis/grpc/server.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen
147147
}
148148
vtap := fmt.Sprintf("%d-%d", orgID, vtapID)
149149

150+
var refresh bool
150151
var localVersion uint64 = 0
151152
if vtapID == 0 {
152153
log.Infof("genesis sync received message with vtap_id 0 from %s", remote, logger.NewORGPrefix(orgID))
@@ -160,9 +161,11 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen
160161
} else {
161162
agingTime = g.cfg.VinterfaceAgingTime
162163
}
163-
if now.Sub(lastTime).Seconds() >= agingTime {
164+
timeSub := now.Sub(lastTime).Seconds()
165+
if timeSub >= agingTime {
164166
g.vtapToVersion.Store(vtap, uint64(0))
165167
}
168+
refresh = timeSub >= g.cfg.AgentHeartBeat*2
166169
}
167170
g.vtapToLastSeen.Store(vtap, now)
168171
lVersion, ok := g.vtapToVersion.Load(vtap)
@@ -176,12 +179,13 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *agent.Gen
176179
log.Debugf("genesis sync renew version %v from ip %s vtap_id %v", version, remote, vtapID, logger.NewORGPrefix(orgID))
177180
g.genesisSyncQueue.Put(
178181
common.VIFRPCMessage{
179-
Peer: remote,
180-
VtapID: vtapID,
181-
ORGID: orgID,
182-
TeamID: uint32(teamID),
183-
MessageType: common.TYPE_RENEW,
184-
Message: request,
182+
Peer: remote,
183+
VtapID: vtapID,
184+
ORGID: orgID,
185+
TeamID: uint32(teamID),
186+
MessageType: common.TYPE_RENEW,
187+
Message: request,
188+
StorageRefresh: refresh,
185189
},
186190
)
187191
return &agent.GenesisSyncResponse{Version: &localVersion}, nil

server/controller/genesis/store/sync/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ func (g *GenesisSync) Start() {
422422
if info.VtapID != 0 {
423423
peerInfo, ok := genesisSyncDataByVtap[vtap]
424424
if ok {
425-
vStorage.Renew(info.ORGID, peerInfo)
425+
vStorage.Renew(info.ORGID, info.VtapID, info.StorageRefresh, peerInfo)
426426
}
427427
}
428428
} else if info.MessageType == common.TYPE_UPDATE {

server/controller/genesis/store/sync/store.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func NewSyncStorage(ctx context.Context, cfg config.GenesisConfig, sChan chan co
5757
}
5858
}
5959

60-
func (s *SyncStorage) Renew(orgID int, data common.GenesisSyncDataResponse) {
60+
func (s *SyncStorage) Renew(orgID int, vtapID uint32, refresh bool, data common.GenesisSyncDataResponse) {
6161
now := time.Now()
6262
s.mutex.Lock()
6363
defer s.mutex.Unlock()
@@ -72,6 +72,20 @@ func (s *SyncStorage) Renew(orgID int, data common.GenesisSyncDataResponse) {
7272
s.genesisSyncInfo.IPlastseens.Renew(orgID, now, data.IPLastSeens)
7373
s.genesisSyncInfo.Vinterfaces.Renew(orgID, now, data.Vinterfaces)
7474
s.genesisSyncInfo.Processes.Renew(orgID, now, data.Processes)
75+
76+
if !refresh {
77+
return
78+
}
79+
db, err := metadb.GetDB(orgID)
80+
if err != nil {
81+
log.Error("get metadb session failed", logger.NewORGPrefix(orgID))
82+
return
83+
}
84+
nodeIP := os.Getenv(ccommon.NODE_IP_KEY)
85+
err = db.Model(&model.GenesisStorage{}).Where("vtap_id = ? AND node_ip <> ?", vtapID, nodeIP).Update("node_ip", nodeIP).Error
86+
if err != nil {
87+
log.Warningf("vtap id (%d) refresh storage to node (%s) failed: %s", vtapID, nodeIP, err, logger.NewORGPrefix(orgID))
88+
}
7589
}
7690

7791
func (s *SyncStorage) Update(orgID int, vtapID uint32, data common.GenesisSyncDataResponse) {

server/server.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,8 @@ controller:
342342

343343
# 数据持久化检测间隔,单位:秒
344344
data_persistence_interval: 60
345+
# 采集器消息心跳时长,单位:秒
346+
agent_heart_beat: 60
345347

346348
# 采集器同步KVM时,配置的采集器IP所上报的内容会被解析
347349
host_ips:

0 commit comments

Comments
 (0)