Skip to content

Commit

Permalink
SKS-2344: Clean unused CloudTower labels created by CAPE every 24h (#167
Browse files Browse the repository at this point in the history
)
  • Loading branch information
haijianyang authored Jan 12, 2024
1 parent fb5730b commit f6ab21f
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 1 deletion.
28 changes: 28 additions & 0 deletions controllers/elfcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,32 @@ func (r *ElfClusterReconciler) reconcileDeleteLabel(ctx *context.ClusterContext,
return nil
}

// cleanOrphanLabels cleans unused labels for Tower every day.
// If an error is encountered during the cleanup process,
// it will not be retried and will be started again in the next reconcile.
func (r *ElfClusterReconciler) cleanOrphanLabels(ctx *context.ClusterContext) {
// Locking ensures that only one coroutine cleans at the same time
if ok := acquireLockForGCTowerLabels(ctx.ElfCluster.Spec.Tower.Server); ok {
defer releaseLockForForGCTowerLabels(ctx.ElfCluster.Spec.Tower.Server)
} else {
return
}

ctx.Logger.V(1).Info(fmt.Sprintf("Cleaning orphan labels in Tower %s created by CAPE", ctx.ElfCluster.Spec.Tower.Server))

keys := []string{towerresources.GetVMLabelClusterName(), towerresources.GetVMLabelVIP(), towerresources.GetVMLabelNamespace()}
labelIDs, err := ctx.VMService.CleanLabels(keys)
if err != nil {
ctx.Logger.Error(err, fmt.Sprintf("Warning: failed to clean orphan labels in Tower %s", ctx.ElfCluster.Spec.Tower.Server))

return
}

recordGCTimeForTowerLabels(ctx.ElfCluster.Spec.Tower.Server)

ctx.Logger.V(1).Info(fmt.Sprintf("Labels of Tower %s are cleaned successfully", ctx.ElfCluster.Spec.Tower.Server), "labelCount", len(labelIDs))
}

func (r *ElfClusterReconciler) reconcileNormal(ctx *context.ClusterContext) (reconcile.Result, error) { //nolint:unparam
ctx.Logger.Info("Reconciling ElfCluster")

Expand All @@ -298,6 +324,8 @@ func (r *ElfClusterReconciler) reconcileNormal(ctx *context.ClusterContext) (rec
return reconcile.Result{}, nil
}

r.cleanOrphanLabels(ctx)

// Wait until the API server is online and accessible.
if !r.isAPIServerOnline(ctx) {
return reconcile.Result{}, nil
Expand Down
41 changes: 41 additions & 0 deletions controllers/elfcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ var _ = Describe("ElfClusterReconciler", func() {
}
fake.InitClusterOwnerReferences(ctrlContext, elfCluster, cluster)

keys := []string{towerresources.GetVMLabelClusterName(), towerresources.GetVMLabelVIP(), towerresources.GetVMLabelNamespace()}
mockVMService.EXPECT().CleanLabels(keys).Return(nil, nil)

elfClusterKey := capiutil.ObjectKey(elfCluster)
reconciler := &ElfClusterReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
_, _ = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfClusterKey})
Expand Down Expand Up @@ -282,4 +285,42 @@ var _ = Describe("ElfClusterReconciler", func() {
Expect(apierrors.IsNotFound(reconciler.Client.Get(reconciler, elfClusterKey, elfCluster))).To(BeTrue())
})
})

Context("CleanLabels", func() {
BeforeEach(func() {
resetMemoryCache()
})

It("should clean labels for Tower", func() {
elfCluster.Spec.ControlPlaneEndpoint.Host = "127.0.0.1"
elfCluster.Spec.ControlPlaneEndpoint.Port = 6443
// ctrlMgrContext := fake.NewControllerManagerContext(cluster, elfCluster)
ctrlContext := newCtrlContexts(elfCluster, cluster)
fake.InitClusterOwnerReferences(ctrlContext, elfCluster, cluster)
clusterContext := &context.ClusterContext{
ControllerContext: ctrlContext,
Cluster: cluster,
ElfCluster: elfCluster,
Logger: ctrllog.Log,
VMService: mockVMService,
}

logBuffer.Reset()
unexpectedError := errors.New("unexpected error")
keys := []string{towerresources.GetVMLabelClusterName(), towerresources.GetVMLabelVIP(), towerresources.GetVMLabelNamespace()}
mockVMService.EXPECT().CleanLabels(keys).Return(nil, unexpectedError)
reconciler := &ElfClusterReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
reconciler.cleanOrphanLabels(clusterContext)
Expect(logBuffer.String()).To(ContainSubstring(fmt.Sprintf("Warning: failed to clean orphan labels in Tower %s", elfCluster.Spec.Tower.Server)))

logBuffer.Reset()
mockVMService.EXPECT().CleanLabels(keys).Return(nil, nil)
reconciler.cleanOrphanLabels(clusterContext)
Expect(logBuffer.String()).To(ContainSubstring(fmt.Sprintf("Labels of Tower %s are cleaned successfully", elfCluster.Spec.Tower.Server)))

logBuffer.Reset()
reconciler.cleanOrphanLabels(clusterContext)
Expect(logBuffer.String()).NotTo(ContainSubstring(fmt.Sprintf("Cleaning orphan labels in Tower %s created by CAPE", elfCluster.Spec.Tower.Server)))
})
})
})
55 changes: 55 additions & 0 deletions controllers/vm_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,61 @@ func getKeyForVMDuplicate(name string) string {
return fmt.Sprintf("vm:duplicate:%s", name)
}

/* Label */

var labelOperationLock sync.Mutex

func getKeyForGCLabel(tower string) string {
return fmt.Sprintf("label:gc:%s", tower)
}

func getKeyForGCLabelTime(tower string) string {
return fmt.Sprintf("label:gc:time:%s", tower)
}

// acquireLockForGCTowerLabels returns whether label gc operation can be performed.
func acquireLockForGCTowerLabels(tower string) bool {
labelOperationLock.Lock()
defer labelOperationLock.Unlock()

if _, found := inMemoryCache.Get(getKeyForGCLabel(tower)); found {
return false
}

key := getKeyForGCLabelTime(tower)
if val, found := inMemoryCache.Get(key); found {
lastGCTime, ok := val.(time.Time)
if ok {
if time.Now().Before(lastGCTime.Add(24 * time.Hour)) {
return false
}
} else {
// Delete unexpected data.
inMemoryCache.Delete(key)
}
}

inMemoryCache.Set(getKeyForGCLabel(tower), nil, cache.NoExpiration)

return true
}

// releaseLockForForGCTowerLabels releases the Tower whose labels are being cleared.
func releaseLockForForGCTowerLabels(tower string) {
labelOperationLock.Lock()
defer labelOperationLock.Unlock()

inMemoryCache.Delete(getKeyForGCLabel(tower))
}

// recordGCTimeForTowerLabels records the last GC label time of the specified Tower.
func recordGCTimeForTowerLabels(tower string) {
labelOperationLock.Lock()
defer labelOperationLock.Unlock()

inMemoryCache.Set(getKeyForGCLabelTime(tower), time.Now(), cache.NoExpiration)
}

/* GPU */

type lockedGPUDevice struct {
Expand Down
15 changes: 15 additions & 0 deletions pkg/service/mock_services/vm_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 26 additions & 1 deletion pkg/service/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type VMService interface {
GetVlan(id string) (*models.Vlan, error)
UpsertLabel(key, value string) (*models.Label, error)
DeleteLabel(key, value string, strict bool) (string, error)
CleanLabels(keys []string) ([]string, error)
AddLabelsToVM(vmID string, labels []string) (*models.Task, error)
CreateVMPlacementGroup(name, clusterID string, vmPolicy models.VMVMPolicy) (*models.WithTaskVMPlacementGroup, error)
GetVMPlacementGroup(name string) (*models.VMPlacementGroup, error)
Expand Down Expand Up @@ -769,7 +770,7 @@ func (svr *TowerVMService) DeleteLabel(key, value string, strict bool) (string,
if strict {
deleteLabelParams.RequestBody.Where.AND = append(
deleteLabelParams.RequestBody.Where.AND,
&models.LabelWhereInput{VMNum: TowerInt32(0)},
&models.LabelWhereInput{TotalNum: TowerInt32(0)},
)
}

Expand All @@ -785,6 +786,30 @@ func (svr *TowerVMService) DeleteLabel(key, value string, strict bool) (string,
return *deleteLabelResp.Payload[0].Data.ID, nil
}

// CleanLabels deletes specified unused labels.
// CleanLabels is used to clean unused labels regularly and should not be called frequently.
func (svr *TowerVMService) CleanLabels(keys []string) ([]string, error) {
deleteLabelParams := clientlabel.NewDeleteLabelParams()
deleteLabelParams.RequestBody = &models.LabelDeletionParams{
Where: &models.LabelWhereInput{
KeyIn: keys,
CreatedAtLte: TowerString(time.Now().Add(-24 * time.Hour).UTC().Format(time.RFC3339)),
},
}

deleteLabelResp, err := svr.Session.Label.DeleteLabel(deleteLabelParams)
if err != nil {
return nil, err
}

labelIDs := make([]string, len(deleteLabelResp.Payload))
for i := 0; i < len(deleteLabelResp.Payload); i++ {
labelIDs[i] = *deleteLabelResp.Payload[i].Data.ID
}

return labelIDs, nil
}

// AddLabelsToVM adds a label to a VM.
func (svr *TowerVMService) AddLabelsToVM(vmID string, labelIds []string) (*models.Task, error) {
addLabelsParams := clientlabel.NewAddLabelsToResourcesParams()
Expand Down

0 comments on commit f6ab21f

Please sign in to comment.