From 2ab36388d86de1905aeaea2181436a4b5bd233c2 Mon Sep 17 00:00:00 2001 From: Francesco Cheinasso Date: Wed, 6 Sep 2023 17:09:36 +0200 Subject: [PATCH] VirtualNode: deletion-routine refactoring --- cmd/liqo-controller-manager/main.go | 46 ++++- .../virtualnode-controller/condition.go | 38 ++-- .../deletion-routine.go | 163 ++++++++++++------ .../virtualnode-controller/finalizer.go | 1 + .../virtualnode-controller/suite_test.go | 1 + .../virtualnode-controller/virtualkubelet.go | 77 +++------ .../virtualnode_controller.go | 20 ++- pkg/utils/getters/k8sGetters.go | 13 ++ pkg/vkMachinery/const.go | 3 + pkg/vkMachinery/forge/forge.go | 2 +- pkg/vkMachinery/utils/doc.go | 16 ++ pkg/vkMachinery/utils/utils.go | 106 ++++++++++++ 12 files changed, 355 insertions(+), 131 deletions(-) create mode 100644 pkg/vkMachinery/utils/doc.go create mode 100644 pkg/vkMachinery/utils/utils.go diff --git a/cmd/liqo-controller-manager/main.go b/cmd/liqo-controller-manager/main.go index cb5cefd969..b5705a2b51 100644 --- a/cmd/liqo-controller-manager/main.go +++ b/cmd/liqo-controller-manager/main.go @@ -227,6 +227,8 @@ func main() { reqRemoteLiqoPods, err := labels.NewRequirement(consts.ManagedByLabelKey, selection.Equals, []string{consts.ManagedByShadowPodValue}) utilruntime.Must(err) + // Create the main manager. + // This manager caches only the pods that are offloaded from a remote cluster and are scheduled on this. mgr, err := ctrl.NewManager(config, ctrl.Options{ MapperProvider: mapper.LiqoMapperProvider(scheme), Scheme: scheme, @@ -257,7 +259,8 @@ func main() { utilruntime.Must(err) // Create an accessory manager that cache only local offloaded pods. - auxmgr, err := ctrl.NewManager(config, ctrl.Options{ + // This manager caches only the pods that are offloaded and scheduled on a remote cluster. + auxmgrLocalPods, err := ctrl.NewManager(config, ctrl.Options{ MapperProvider: mapper.LiqoMapperProvider(scheme), Scheme: scheme, MetricsBindAddress: "0", // Disable the metrics of the auxiliary manager to prevent conflicts. @@ -275,12 +278,42 @@ func main() { klog.Errorf("Unable to create auxiliary manager: %w", err) os.Exit(1) } - if err := mgr.Add(auxmgr); err != nil { + + // Create a label selector to filter only the events for virtual kubelet pods + reqVirtualKubeletPods, err := labels.NewRequirement(consts.K8sAppComponentKey, selection.Equals, + []string{vkMachinery.KubeletBaseLabels[consts.K8sAppComponentKey]}) + utilruntime.Must(err) + + // Create an accessory manager that cache only local offloaded pods. + // This manager caches only virtual kubelet pods. + auxmgrVirtualKubeletPods, err := ctrl.NewManager(config, ctrl.Options{ + MapperProvider: mapper.LiqoMapperProvider(scheme), + Scheme: scheme, + MetricsBindAddress: "0", // Disable the metrics of the auxiliary manager to prevent conflicts. + NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) { + opts.ByObject = map[client.Object]cache.ByObject{ + &corev1.Pod{}: { + Label: labels.NewSelector().Add(*reqVirtualKubeletPods), + }, + } + return cache.New(config, opts) + }, + }) + + if err != nil { + klog.Errorf("Unable to create auxiliary manager: %w", err) + os.Exit(1) + } + + if err := mgr.Add(auxmgrLocalPods); err != nil { klog.Errorf("Unable to add the auxiliary manager to the main one: %w", err) os.Exit(1) } - localPodsClient := auxmgr.GetClient() + if err := mgr.Add(auxmgrVirtualKubeletPods); err != nil { + klog.Errorf("Unable to add the auxiliary manager to the main one: %w", err) + os.Exit(1) + } // Register the healthiness probes. if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { @@ -307,7 +340,7 @@ func main() { os.Exit(1) } - if err := indexer.IndexField(ctx, auxmgr, &corev1.Pod{}, indexer.FieldNodeNameFromPod, indexer.ExtractNodeName); err != nil { + if err := indexer.IndexField(ctx, auxmgrLocalPods, &corev1.Pod{}, indexer.FieldNodeNameFromPod, indexer.ExtractNodeName); err != nil { klog.Errorf("Unable to setup the indexer for the Pod nodeName field: %v", err) os.Exit(1) } @@ -394,7 +427,8 @@ func main() { virtualNodeReconciler, err := virtualnodectrl.NewVirtualNodeReconciler( ctx, mgr.GetClient(), - auxmgr.GetClient(), + auxmgrLocalPods.GetClient(), + auxmgrVirtualKubeletPods.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor("virtualnode-controller"), &clusterIdentity, @@ -485,7 +519,7 @@ func main() { podStatusReconciler := &podstatusctrl.PodStatusReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), - LocalPodsClient: localPodsClient, + LocalPodsClient: auxmgrLocalPods.GetClient(), } if err = podStatusReconciler.SetupWithManager(mgr); err != nil { klog.Errorf("Unable to start the podstatus reconciler", err) diff --git a/pkg/liqo-controller-manager/virtualnode-controller/condition.go b/pkg/liqo-controller-manager/virtualnode-controller/condition.go index 1e7e40ef51..151b8ffc6c 100644 --- a/pkg/liqo-controller-manager/virtualnode-controller/condition.go +++ b/pkg/liqo-controller-manager/virtualnode-controller/condition.go @@ -23,11 +23,11 @@ import ( virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1" ) -// VkConditionMap is a map of virtual node conditions. -type VkConditionMap map[virtualkubeletv1alpha1.VirtualNodeConditionType]VkCondition +// VnConditionMap is a map of virtual node conditions. +type VnConditionMap map[virtualkubeletv1alpha1.VirtualNodeConditionType]VnCondition -// VkCondition is a virtual node condition. -type VkCondition struct { +// VnCondition is a virtual node condition. +type VnCondition struct { Status virtualkubeletv1alpha1.VirtualNodeConditionStatusType Message string } @@ -35,34 +35,42 @@ type VkCondition struct { // ForgeCondition forges a virtual node condition. func ForgeCondition( virtualNode *virtualkubeletv1alpha1.VirtualNode, - vkConditions VkConditionMap) (update bool) { - for nameCondition, vkCondition := range vkConditions { + vnConditions VnConditionMap) (update bool) { + for nameCondition, vnCondition := range vnConditions { for i := range virtualNode.Status.Conditions { if virtualNode.Status.Conditions[i].Type != nameCondition { continue } - if virtualNode.Status.Conditions[i].Status == vkCondition.Status { + if virtualNode.Status.Conditions[i].Status == vnCondition.Status { return false } if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.RunningConditionStatusType) && - (vkCondition.Status == virtualkubeletv1alpha1.CreatingConditionStatusType) { + (vnCondition.Status == virtualkubeletv1alpha1.CreatingConditionStatusType) { + return false + } + if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.NoneConditionStatusType) && + (vnCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType) { + return false + } + if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.NoneConditionStatusType) && + (vnCondition.Status == virtualkubeletv1alpha1.DeletingConditionStatusType) { return false } if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.DeletingConditionStatusType) && - vkCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType { + vnCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType { return false } - virtualNode.Status.Conditions[i].Status = vkCondition.Status + virtualNode.Status.Conditions[i].Status = vnCondition.Status virtualNode.Status.Conditions[i].LastTransitionTime = metav1.Now() - virtualNode.Status.Conditions[i].Message = vkCondition.Message + virtualNode.Status.Conditions[i].Message = vnCondition.Message return true } virtualNode.Status.Conditions = append(virtualNode.Status.Conditions, virtualkubeletv1alpha1.VirtualNodeCondition{ Type: nameCondition, - Status: vkCondition.Status, + Status: vnCondition.Status, LastTransitionTime: metav1.Now(), - Message: vkCondition.Message, + Message: vnCondition.Message, }) } return true @@ -71,9 +79,9 @@ func ForgeCondition( // UpdateCondition updates the condition of the virtual node. func UpdateCondition(ctx context.Context, cl client.Client, virtualNode *virtualkubeletv1alpha1.VirtualNode, - vkConditions VkConditionMap, + vnConditions VnConditionMap, ) error { - if ForgeCondition(virtualNode, vkConditions) { + if ForgeCondition(virtualNode, vnConditions) { if err := cl.Status().Update(ctx, virtualNode); err != nil { return err } diff --git a/pkg/liqo-controller-manager/virtualnode-controller/deletion-routine.go b/pkg/liqo-controller-manager/virtualnode-controller/deletion-routine.go index ebf7fb4086..7363b065b0 100644 --- a/pkg/liqo-controller-manager/virtualnode-controller/deletion-routine.go +++ b/pkg/liqo-controller-manager/virtualnode-controller/deletion-routine.go @@ -19,18 +19,28 @@ package virtualnodectrl import ( "context" "fmt" + "strconv" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1" "github.com/liqotech/liqo/pkg/utils/getters" + vkMachineryForge "github.com/liqotech/liqo/pkg/vkMachinery/forge" + vkutils "github.com/liqotech/liqo/pkg/vkMachinery/utils" ) var ( deletionRoutineRunning = false + createNodeFalseFlag = vkutils.Flag{ + Name: string(vkMachineryForge.CreateNode), + Value: strconv.FormatBool(false), + } ) // DeletionRoutine is responsible for deleting a virtual node. @@ -53,64 +63,114 @@ func RunDeletionRoutine(ctx context.Context, r *VirtualNodeReconciler) (*Deletio return dr, nil } +// EnsureNodeAbsence adds a virtual node to the deletion queue. +func (dr *DeletionRoutine) EnsureNodeAbsence(vn *virtualkubeletv1alpha1.VirtualNode) error { + key, err := cache.MetaNamespaceKeyFunc(vn) + if err != nil { + return fmt.Errorf("error getting key: %w", err) + } + dr.wq.AddRateLimited(key) + return nil +} + func (dr *DeletionRoutine) run(ctx context.Context) { - for { - vni, _ := dr.wq.Get() - vn := vni.(*virtualkubeletv1alpha1.VirtualNode) - klog.Infof("Deletion routine started for virtual node %s", vn.Name) - if err := UpdateCondition(ctx, dr.vnr.Client, vn, - VkConditionMap{ - virtualkubeletv1alpha1.NodeConditionType: VkCondition{ - Status: virtualkubeletv1alpha1.DrainingConditionStatusType, - }, - }); err != nil { - dr.reEnqueueVirtualNode(vn, fmt.Errorf("error updating condition: %w", err)) - continue - } + defer klog.Error("Deletion routine stopped") + for dr.processNextItem(ctx) { + } +} - node, err := getters.GetNodeFromVirtualNode(ctx, dr.vnr.Client, vn) - if client.IgnoreNotFound(err) != nil { - dr.reEnqueueVirtualNode(vn, fmt.Errorf("error getting node: %w", err)) - continue - } +func (dr *DeletionRoutine) processNextItem(ctx context.Context) bool { + var err error + key, quit := dr.wq.Get() + if quit { + return false + } + defer dr.wq.Done(key) - if node != nil { - if err := dr.deleteNode(ctx, node, vn); err != nil { - dr.reEnqueueVirtualNode(vn, fmt.Errorf("error deleting node: %w", err)) - continue - } + ref, ok := key.(string) + if !ok { + klog.Errorf("expected string in workqueue but got %#v", key) + return true + } + + if err = dr.handle(ctx, ref); err == nil { + dr.wq.Forget(key) + return true + } + + klog.Errorf("error processing %q (will retry): %v", key, err) + dr.wq.AddRateLimited(key) + return true +} + +func (dr *DeletionRoutine) handle(ctx context.Context, key string) error { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return fmt.Errorf("error splitting key: %w", err) + } + ref := types.NamespacedName{Namespace: namespace, Name: name} + vn := &virtualkubeletv1alpha1.VirtualNode{} + if err := dr.vnr.Client.Get(ctx, ref, vn); err != nil { + if k8serrors.IsNotFound(err) { + return nil } + return fmt.Errorf("error getting virtual node: %w", err) + } + klog.Infof("Deletion routine started for virtual node %s", vn.Name) + if err := UpdateCondition(ctx, dr.vnr.Client, vn, + VnConditionMap{ + virtualkubeletv1alpha1.NodeConditionType: VnCondition{ + Status: virtualkubeletv1alpha1.DrainingConditionStatusType, + }, + }); err != nil { + return fmt.Errorf("error updating condition: %w", err) + } - if !vn.DeletionTimestamp.IsZero() { - if err := dr.vnr.ensureNamespaceMapAbsence(ctx, vn); err != nil { - dr.reEnqueueVirtualNode(vn, fmt.Errorf("error deleting namespace map: %w", err)) - continue - } - err := dr.vnr.removeVirtualNodeFinalizer(ctx, vn) - if err != nil { - dr.reEnqueueVirtualNode(vn, fmt.Errorf("error removing finalizer: %w", err)) - continue + node, err := getters.GetNodeFromVirtualNode(ctx, dr.vnr.Client, vn) + if client.IgnoreNotFound(err) != nil { + return fmt.Errorf("error getting node: %w", err) + } + + if node != nil { + if !*vn.Spec.CreateNode { + // We need to ensure that the current pods will no recreate the node after deleting it. + if found, err := vkutils.CheckVirtualKubeletFlagsConsistence( + ctx, dr.vnr.ClientVK, vn, dr.vnr.VirtualKubeletOptions, createNodeFalseFlag); err != nil || !found { + if err == nil { + err = fmt.Errorf("virtual kubelet pods are still running with arg %s", createNodeFalseFlag.String()) + } + return fmt.Errorf("error checking virtual kubelet pods: %w", err) } } - - klog.Infof("Deletion routine completed for virtual node %s", vn.Name) - dr.wq.Forget(vn) - dr.wq.Done(vn) + if err := dr.deleteNode(ctx, node, vn); err != nil { + return fmt.Errorf("error deleting node: %w", err) + } } -} -// reEnqueueVirtualNode re-enqueues a virtual node in the deletion queue. -func (dr *DeletionRoutine) reEnqueueVirtualNode(vn *virtualkubeletv1alpha1.VirtualNode, err error) { - if err != nil { - klog.Error(err) + if !vn.DeletionTimestamp.IsZero() { + // VirtualNode resource is being deleted. + if err := dr.vnr.ensureNamespaceMapAbsence(ctx, vn); err != nil { + return fmt.Errorf("error deleting namespace map: %w", err) + } + err := dr.vnr.removeVirtualNodeFinalizer(ctx, vn) + if err != nil { + return fmt.Errorf("error removing finalizer: %w", err) + } + } else { + // Node is being deleted, but the VirtualNode resource is not. + // The VirtualNode .Spec.CreateNode field is set to false. + if err := UpdateCondition(ctx, dr.vnr.Client, vn, + VnConditionMap{ + virtualkubeletv1alpha1.NodeConditionType: VnCondition{ + Status: virtualkubeletv1alpha1.NoneConditionStatusType, + }, + }); err != nil { + return fmt.Errorf("error updating condition: %w", err) + } } - dr.wq.Done(vn) - dr.wq.AddRateLimited(vn) -} -// EnsureNodeAbsence adds a virtual node to the deletion queue. -func (dr *DeletionRoutine) EnsureNodeAbsence(vn *virtualkubeletv1alpha1.VirtualNode) { - dr.wq.AddRateLimited(vn) + klog.Infof("Deletion routine completed for virtual node %s", vn.Name) + return nil } // deleteNode deletes the Node created by VirtualNode. @@ -129,8 +189,8 @@ func (dr *DeletionRoutine) deleteNode(ctx context.Context, node *corev1.Node, vn if !vn.DeletionTimestamp.IsZero() { if err := UpdateCondition(ctx, dr.vnr.Client, vn, - VkConditionMap{ - virtualkubeletv1alpha1.VirtualKubeletConditionType: VkCondition{ + VnConditionMap{ + virtualkubeletv1alpha1.VirtualKubeletConditionType: VnCondition{ Status: virtualkubeletv1alpha1.DeletingConditionStatusType, }, }, @@ -141,12 +201,11 @@ func (dr *DeletionRoutine) deleteNode(ctx context.Context, node *corev1.Node, vn return fmt.Errorf("error deleting virtual kubelet deployment: %w", err) } } - klog.Infof("VirtualKubelet deployment %s deleted", vn.Name) if err := UpdateCondition(ctx, dr.vnr.Client, vn, - VkConditionMap{ - virtualkubeletv1alpha1.NodeConditionType: VkCondition{ + VnConditionMap{ + virtualkubeletv1alpha1.NodeConditionType: VnCondition{ Status: virtualkubeletv1alpha1.DeletingConditionStatusType, }, }); err != nil { diff --git a/pkg/liqo-controller-manager/virtualnode-controller/finalizer.go b/pkg/liqo-controller-manager/virtualnode-controller/finalizer.go index be04980417..7122ba0981 100644 --- a/pkg/liqo-controller-manager/virtualnode-controller/finalizer.go +++ b/pkg/liqo-controller-manager/virtualnode-controller/finalizer.go @@ -34,5 +34,6 @@ func (r *VirtualNodeReconciler) ensureVirtualNodeFinalizerPresence(ctx context.C func (r *VirtualNodeReconciler) removeVirtualNodeFinalizer(ctx context.Context, virtualNode *virtualkubeletv1alpha1.VirtualNode) error { ctrlutil.RemoveFinalizer(virtualNode, virtualNodeControllerFinalizer) + klog.Infof("Removing finalizer %s from virtual-node %s", virtualNodeControllerFinalizer, virtualNode.Name) return r.Client.Update(ctx, virtualNode) } diff --git a/pkg/liqo-controller-manager/virtualnode-controller/suite_test.go b/pkg/liqo-controller-manager/virtualnode-controller/suite_test.go index 85a7a92983..921eb653a5 100644 --- a/pkg/liqo-controller-manager/virtualnode-controller/suite_test.go +++ b/pkg/liqo-controller-manager/virtualnode-controller/suite_test.go @@ -123,6 +123,7 @@ var _ = BeforeSuite(func() { Expect(k8sClient).ToNot(BeNil()) vnr, err := NewVirtualNodeReconciler(ctx, + k8sClient, k8sClient, k8sClient, scheme.Scheme, diff --git a/pkg/liqo-controller-manager/virtualnode-controller/virtualkubelet.go b/pkg/liqo-controller-manager/virtualnode-controller/virtualkubelet.go index db6f6e9a70..341f266870 100644 --- a/pkg/liqo-controller-manager/virtualnode-controller/virtualkubelet.go +++ b/pkg/liqo-controller-manager/virtualnode-controller/virtualkubelet.go @@ -23,26 +23,31 @@ import ( rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" - "k8s.io/utils/strings" + k8strings "k8s.io/utils/strings" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1" "github.com/liqotech/liqo/pkg/vkMachinery" vkforge "github.com/liqotech/liqo/pkg/vkMachinery/forge" + vkutils "github.com/liqotech/liqo/pkg/vkMachinery/utils" ) // createVirtualKubeletDeployment creates the VirtualKubelet Deployment. func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentPresence( ctx context.Context, cl client.Client, virtualNode *virtualkubeletv1alpha1.VirtualNode) error { + var nodeStatusInitial virtualkubeletv1alpha1.VirtualNodeConditionStatusType + if *virtualNode.Spec.CreateNode { + nodeStatusInitial = virtualkubeletv1alpha1.CreatingConditionStatusType + } else { + nodeStatusInitial = virtualkubeletv1alpha1.NoneConditionStatusType + } if err := UpdateCondition(ctx, cl, virtualNode, - VkConditionMap{ - virtualkubeletv1alpha1.VirtualKubeletConditionType: VkCondition{ - Status: virtualkubeletv1alpha1.CreatingConditionStatusType, - }, - virtualkubeletv1alpha1.NodeConditionType: VkCondition{ + VnConditionMap{ + virtualkubeletv1alpha1.VirtualKubeletConditionType: VnCondition{ Status: virtualkubeletv1alpha1.CreatingConditionStatusType, }, + virtualkubeletv1alpha1.NodeConditionType: VnCondition{Status: nodeStatusInitial}, }, ); err != nil { return err @@ -98,25 +103,28 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentPresence( } if err := UpdateCondition(ctx, cl, virtualNode, - VkConditionMap{ - virtualkubeletv1alpha1.VirtualKubeletConditionType: VkCondition{ + VnConditionMap{ + virtualkubeletv1alpha1.VirtualKubeletConditionType: VnCondition{ Status: virtualkubeletv1alpha1.RunningConditionStatusType, }, }); err != nil { return err } - return UpdateCondition(ctx, cl, virtualNode, - VkConditionMap{ - virtualkubeletv1alpha1.NodeConditionType: VkCondition{ - Status: virtualkubeletv1alpha1.RunningConditionStatusType, - }, - }) + if *virtualNode.Spec.CreateNode { + return UpdateCondition(ctx, cl, virtualNode, + VnConditionMap{ + virtualkubeletv1alpha1.NodeConditionType: VnCondition{ + Status: virtualkubeletv1alpha1.RunningConditionStatusType, + }, + }) + } + return nil } // ensureVirtualKubeletDeploymentAbsence deletes the VirtualKubelet Deployment. func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentAbsence( ctx context.Context, virtualNode *virtualkubeletv1alpha1.VirtualNode) (reEnque bool, err error) { - virtualKubeletDeployment, err := r.getVirtualKubeletDeployment(ctx, virtualNode) + virtualKubeletDeployment, err := vkutils.GetVirtualKubeletDeployment(ctx, r.Client, virtualNode, r.VirtualKubeletOptions) if err != nil { klog.Error(err) return true, err @@ -134,12 +142,12 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentAbsence( return true, err } - if ok, err := checkVirtualKubeletPodAbsence(ctx, r.Client, virtualNode, r.VirtualKubeletOptions); err != nil || !ok { + if ok, err := vkutils.CheckVirtualKubeletPodAbsence(ctx, r.ClientVK, virtualNode, r.VirtualKubeletOptions); err != nil || !ok { return true, err } if err := r.Client.Delete(ctx, &rbacv1.ClusterRoleBinding{ObjectMeta: metav1.ObjectMeta{ - Name: strings.ShortenString(fmt.Sprintf("%s%s", vkMachinery.CRBPrefix, virtualNode.Name), 253), + Name: k8strings.ShortenString(fmt.Sprintf("%s%s", vkMachinery.CRBPrefix, virtualNode.Name), 253), }}); err != nil { klog.Error(err) return true, err @@ -154,38 +162,3 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentAbsence( return false, nil } - -// getVirtualKubeletDeployment returns the VirtualKubelet Deployment given a VirtualNode. -func (r *VirtualNodeReconciler) getVirtualKubeletDeployment( - ctx context.Context, virtualNode *virtualkubeletv1alpha1.VirtualNode) (*appsv1.Deployment, error) { - var deployList appsv1.DeploymentList - labels := vkforge.VirtualKubeletLabels(virtualNode, r.VirtualKubeletOptions) - if err := r.Client.List(ctx, &deployList, client.MatchingLabels(labels)); err != nil { - klog.Error(err) - return nil, err - } - - if len(deployList.Items) == 0 { - klog.V(4).Infof("[%v] no VirtualKubelet deployment found", virtualNode.Spec.ClusterIdentity.ClusterID) - return nil, nil - } else if len(deployList.Items) > 1 { - err := fmt.Errorf("[%v] more than one VirtualKubelet deployment found", virtualNode.Spec.ClusterIdentity.ClusterID) - klog.Error(err) - return nil, err - } - - return &deployList.Items[0], nil -} - -func checkVirtualKubeletPodAbsence(ctx context.Context, cl client.Client, - vn *virtualkubeletv1alpha1.VirtualNode, vkopt *vkforge.VirtualKubeletOpts) (bool, error) { - klog.Warningf("[%v] checking virtual-kubelet pod absence", vn.Spec.ClusterIdentity.ClusterID) - list := &corev1.PodList{} - labels := vkforge.VirtualKubeletLabels(vn, vkopt) - err := cl.List(ctx, list, client.MatchingLabels(labels)) - if err != nil { - klog.Error(err) - return false, nil - } - return len(list.Items) == 0, err -} diff --git a/pkg/liqo-controller-manager/virtualnode-controller/virtualnode_controller.go b/pkg/liqo-controller-manager/virtualnode-controller/virtualnode_controller.go index 3f808ec6b4..c882b20916 100644 --- a/pkg/liqo-controller-manager/virtualnode-controller/virtualnode_controller.go +++ b/pkg/liqo-controller-manager/virtualnode-controller/virtualnode_controller.go @@ -49,7 +49,10 @@ const ( // VirtualNodeReconciler manage NamespaceMap lifecycle. type VirtualNodeReconciler struct { client.Client - ClientLocal client.Client + // Client used to list local pods + ClientLocal client.Client + // Client used to list virtual-kubelet pods + ClientVK client.Client Scheme *runtime.Scheme HomeClusterIdentity *discoveryv1alpha1.ClusterIdentity VirtualKubeletOptions *vkforge.VirtualKubeletOpts @@ -60,13 +63,14 @@ type VirtualNodeReconciler struct { // NewVirtualNodeReconciler returns a new VirtualNodeReconciler. func NewVirtualNodeReconciler( ctx context.Context, - cl client.Client, cll client.Client, + cl client.Client, cll client.Client, clvk client.Client, s *runtime.Scheme, er record.EventRecorder, hci *discoveryv1alpha1.ClusterIdentity, vko *vkforge.VirtualKubeletOpts, ) (*VirtualNodeReconciler, error) { vnr := &VirtualNodeReconciler{ Client: cl, ClientLocal: cll, + ClientVK: clvk, Scheme: s, HomeClusterIdentity: hci, VirtualKubeletOptions: vko, @@ -109,8 +113,11 @@ func (r *VirtualNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } else { if ctrlutil.ContainsFinalizer(virtualNode, virtualNodeControllerFinalizer) { - r.dr.EnsureNodeAbsence(virtualNode) - return ctrl.Result{Requeue: true}, nil + // If the virtual-node is being deleted, it deletes the node and the virtual-node resource. + if err := r.dr.EnsureNodeAbsence(virtualNode); err != nil { + klog.Errorf(" %s --> Unable to delete the virtual-node", err) + return ctrl.Result{}, err + } } return ctrl.Result{}, nil } @@ -120,7 +127,10 @@ func (r *VirtualNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } if !*virtualNode.Spec.CreateNode { - r.dr.EnsureNodeAbsence(virtualNode) + // If the virtual-node is not enabled, it deletes the node but not the virtual-node resource. + if err := r.dr.EnsureNodeAbsence(virtualNode); err != nil { + klog.Errorf(" %s --> Unable to delete the node", err) + } } // If there is no NamespaceMap associated with this virtual-node, it creates a new one. diff --git a/pkg/utils/getters/k8sGetters.go b/pkg/utils/getters/k8sGetters.go index 9ee21333ec..236e1e07b8 100644 --- a/pkg/utils/getters/k8sGetters.go +++ b/pkg/utils/getters/k8sGetters.go @@ -32,6 +32,7 @@ import ( sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1" virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1" "github.com/liqotech/liqo/pkg/consts" + vkforge "github.com/liqotech/liqo/pkg/vkMachinery/forge" ) // GetIPAMStorageByLabel it returns a IPAMStorage instance that matches the given label selector. @@ -312,3 +313,15 @@ func MapForeignClustersByLabel(ctx context.Context, cl client.Client, } return result, nil } + +// ListVirtualKubeletPodsFromVirtualNode returns the list of pods running a VirtualNode's VirtualKubelet. +func ListVirtualKubeletPodsFromVirtualNode(ctx context.Context, cl client.Client, + vn *virtualkubeletv1alpha1.VirtualNode, vkopt *vkforge.VirtualKubeletOpts) (*corev1.PodList, error) { + list := &corev1.PodList{} + vklabels := vkforge.VirtualKubeletLabels(vn, vkopt) + err := cl.List(ctx, list, client.MatchingLabels(vklabels)) + if err != nil { + return nil, err + } + return list, nil +} diff --git a/pkg/vkMachinery/const.go b/pkg/vkMachinery/const.go index 283ebfbce6..49df8f6c6b 100644 --- a/pkg/vkMachinery/const.go +++ b/pkg/vkMachinery/const.go @@ -20,6 +20,9 @@ const LocalClusterRoleName = "liqo-virtual-kubelet-local" // ServiceAccountName -> the name of the service account leveraged by the virtual kubelet. const ServiceAccountName = "virtual-kubelet" +// ContainerName -> the name of the container used to run the virtual kubelet. +const ContainerName = "virtual-kubelet" + // CRBPrefix -> the prefix used to create the virtual kubelet cluster role binding name. const CRBPrefix = "liqo-node-" diff --git a/pkg/vkMachinery/forge/forge.go b/pkg/vkMachinery/forge/forge.go index adc05b6023..200dfbe6cf 100644 --- a/pkg/vkMachinery/forge/forge.go +++ b/pkg/vkMachinery/forge/forge.go @@ -104,7 +104,7 @@ func forgeVKContainers( return []v1.Container{ { - Name: "virtual-kubelet", + Name: vk.ContainerName, Resources: pod.ForgeContainerResources(opts.RequestsCPU, opts.LimitsCPU, opts.RequestsRAM, opts.LimitsRAM), Image: vkImage, Command: command, diff --git a/pkg/vkMachinery/utils/doc.go b/pkg/vkMachinery/utils/doc.go new file mode 100644 index 0000000000..add90b7c97 --- /dev/null +++ b/pkg/vkMachinery/utils/doc.go @@ -0,0 +1,16 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package utils contains some utility functions used by the virtual node controller. +package utils diff --git a/pkg/vkMachinery/utils/utils.go b/pkg/vkMachinery/utils/utils.go new file mode 100644 index 0000000000..4cff44d630 --- /dev/null +++ b/pkg/vkMachinery/utils/utils.go @@ -0,0 +1,106 @@ +// Copyright 2019-2023 The Liqo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "fmt" + "strings" + + appsv1 "k8s.io/api/apps/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1" + "github.com/liqotech/liqo/pkg/utils/getters" + "github.com/liqotech/liqo/pkg/vkMachinery" + vkforge "github.com/liqotech/liqo/pkg/vkMachinery/forge" +) + +// GetVirtualKubeletDeployment returns the VirtualKubelet Deployment of a VirtualNode. +func GetVirtualKubeletDeployment( + ctx context.Context, cl client.Client, virtualNode *virtualkubeletv1alpha1.VirtualNode, + vkopts *vkforge.VirtualKubeletOpts) (*appsv1.Deployment, error) { + var deployList appsv1.DeploymentList + labels := vkforge.VirtualKubeletLabels(virtualNode, vkopts) + if err := cl.List(ctx, &deployList, client.MatchingLabels(labels)); err != nil { + klog.Error(err) + return nil, err + } + + if len(deployList.Items) == 0 { + klog.V(4).Infof("[%v] no VirtualKubelet deployment found", virtualNode.Spec.ClusterIdentity.ClusterID) + return nil, nil + } else if len(deployList.Items) > 1 { + err := fmt.Errorf("[%v] more than one VirtualKubelet deployment found", virtualNode.Spec.ClusterIdentity.ClusterID) + klog.Error(err) + return nil, err + } + + return &deployList.Items[0], nil +} + +// CheckVirtualKubeletPodAbsence checks if a VirtualNode's VirtualKubelet pods are absent. +func CheckVirtualKubeletPodAbsence(ctx context.Context, cl client.Client, + vn *virtualkubeletv1alpha1.VirtualNode, vkopts *vkforge.VirtualKubeletOpts) (bool, error) { + klog.V(4).Infof("[%v] checking virtual-kubelet pod absence", vn.Spec.ClusterIdentity.ClusterName) + list, err := getters.ListVirtualKubeletPodsFromVirtualNode(ctx, cl, vn, vkopts) + if err != nil { + return false, err + } + klog.V(4).Infof("[%v] found %v virtual-kubelet pods", vn.Spec.ClusterIdentity.ClusterName, len(list.Items)) + return len(list.Items) == 0, err +} + +// Flag is a VirtualKubelet flag. +// Name must contain the "--" prefix. +type Flag struct { + Name string + Value string +} + +// String returns the flag as a string. +func (f Flag) String() string { + return fmt.Sprintf("%s=%s", f.Name, f.Value) +} + +// CheckVirtualKubeletFlagsConsistence checks if the VirtualKubelet args are consistent with the flag list provided. +// It returns true if all the flags are consistent, false otherwise. +// A flag is not consistent if it is present in the VirtualKubelet args with a different value. +func CheckVirtualKubeletFlagsConsistence( + ctx context.Context, cl client.Client, vn *virtualkubeletv1alpha1.VirtualNode, vkopts *vkforge.VirtualKubeletOpts, flags ...Flag) (bool, error) { + list, err := getters.ListVirtualKubeletPodsFromVirtualNode(ctx, cl, vn, vkopts) + if err != nil { + return false, err + } + for i := range list.Items { + for j := range list.Items[i].Spec.Containers { + if list.Items[i].Spec.Containers[j].Name != vkMachinery.ContainerName { + continue + } + for _, arg := range list.Items[i].Spec.Containers[j].Args { + for _, flag := range flags { + if strings.HasPrefix(arg, flag.Name) { + if flag.String() != arg { + return false, nil + } + break + } + } + } + } + } + return true, nil +}