Skip to content

Commit

Permalink
VirtualNode: deletion-routine refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Sep 7, 2023
1 parent 459f2e3 commit f481cc2
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ func ForgeCondition(
(vkCondition.Status == virtualkubeletv1alpha1.CreatingConditionStatusType) {
return false
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.NoneConditionStatusType) &&
(vkCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType) {
return false
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.NoneConditionStatusType) &&
(vkCondition.Status == virtualkubeletv1alpha1.DeletingConditionStatusType) {
return false
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.DeletingConditionStatusType) &&
vkCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType {
return false
Expand Down
148 changes: 99 additions & 49 deletions pkg/liqo-controller-manager/virtualnode-controller/deletion-routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ package virtualnodectrl
import (
"context"
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
"github.com/liqotech/liqo/pkg/utils/getters"
vkMachineryForge "github.com/liqotech/liqo/pkg/vkMachinery/forge"
)

var (
Expand All @@ -53,64 +57,111 @@ func RunDeletionRoutine(ctx context.Context, r *VirtualNodeReconciler) (*Deletio
return dr, nil
}

func (dr *DeletionRoutine) run(ctx context.Context) {
for {
vni, _ := dr.wq.Get()
vn := vni.(*virtualkubeletv1alpha1.VirtualNode)
klog.Infof("Deletion routine started for virtual node %s", vn.Name)
if err := UpdateCondition(ctx, dr.vnr.Client, vn,
VkConditionMap{
virtualkubeletv1alpha1.NodeConditionType: VkCondition{
Status: virtualkubeletv1alpha1.DrainingConditionStatusType,
},
}); err != nil {
dr.reEnqueueVirtualNode(vn, fmt.Errorf("error updating condition: %w", err))
continue
}
// EnsureNodeAbsence adds a virtual node to the deletion queue.
func (dr *DeletionRoutine) EnsureNodeAbsence(vn *virtualkubeletv1alpha1.VirtualNode) error {
key, err := cache.MetaNamespaceKeyFunc(vn)
if err != nil {
return fmt.Errorf("error getting key: %w", err)
}
dr.wq.AddRateLimited(key)
return nil
}

node, err := getters.GetNodeFromVirtualNode(ctx, dr.vnr.Client, vn)
if client.IgnoreNotFound(err) != nil {
dr.reEnqueueVirtualNode(vn, fmt.Errorf("error getting node: %w", err))
continue
}
func (dr *DeletionRoutine) run(ctx context.Context) {
defer klog.Fatal("Deletion routine stopped")
for dr.processNextItem(ctx) {
}
}

if node != nil {
if err := dr.deleteNode(ctx, node, vn); err != nil {
dr.reEnqueueVirtualNode(vn, fmt.Errorf("error deleting node: %w", err))
continue
}
}
func (dr *DeletionRoutine) processNextItem(ctx context.Context) bool {
var err error
key, quit := dr.wq.Get()
if quit {
return false
}
defer dr.wq.Done(key)

if !vn.DeletionTimestamp.IsZero() {
if err := dr.vnr.ensureNamespaceMapAbsence(ctx, vn); err != nil {
dr.reEnqueueVirtualNode(vn, fmt.Errorf("error deleting namespace map: %w", err))
continue
}
err := dr.vnr.removeVirtualNodeFinalizer(ctx, vn)
if err != nil {
dr.reEnqueueVirtualNode(vn, fmt.Errorf("error removing finalizer: %w", err))
continue
}
}
ref, ok := key.(string)
if !ok {
klog.Errorf("expected string in workqueue but got %#v", key)
return true
}

klog.Infof("Deletion routine completed for virtual node %s", vn.Name)
dr.wq.Forget(vn)
dr.wq.Done(vn)
if err = dr.handle(ctx, ref); err == nil {
dr.wq.Forget(key)
return true
}

klog.Errorf("error processing %q (will retry): %v", key, err)
dr.wq.AddRateLimited(key)
return true
}

// reEnqueueVirtualNode re-enqueues a virtual node in the deletion queue.
func (dr *DeletionRoutine) reEnqueueVirtualNode(vn *virtualkubeletv1alpha1.VirtualNode, err error) {
func (dr *DeletionRoutine) handle(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Error(err)
return fmt.Errorf("error splitting key: %w", err)
}
ref := types.NamespacedName{Namespace: namespace, Name: name}
vn := &virtualkubeletv1alpha1.VirtualNode{}
if err := dr.vnr.Client.Get(ctx, ref, vn); err != nil {
return fmt.Errorf("error getting virtual node: %w", err)
}
klog.Infof("Deletion routine started for virtual node %s", vn.Name)
if err := UpdateCondition(ctx, dr.vnr.Client, vn,
VkConditionMap{
virtualkubeletv1alpha1.NodeConditionType: VkCondition{
Status: virtualkubeletv1alpha1.DrainingConditionStatusType,
},
}); err != nil {
return fmt.Errorf("error updating condition: %w", err)
}
dr.wq.Done(vn)
dr.wq.AddRateLimited(vn)
}

// EnsureNodeAbsence adds a virtual node to the deletion queue.
func (dr *DeletionRoutine) EnsureNodeAbsence(vn *virtualkubeletv1alpha1.VirtualNode) {
dr.wq.AddRateLimited(vn)
node, err := getters.GetNodeFromVirtualNode(ctx, dr.vnr.Client, vn)
if client.IgnoreNotFound(err) != nil {
return fmt.Errorf("error getting node: %w", err)
}

if node != nil {
if !*vn.Spec.CreateNode {
// We need to check if the schedule pods could recreate the node before deleting it.
if found, err := dr.vnr.checkVirtualKubeletPodsContainsOnlyArg(
ctx, vn, string(vkMachineryForge.CreateNode), strconv.FormatBool(*vn.Spec.CreateNode)); err != nil || !found {
if err == nil {
err = fmt.Errorf("virtual kubelet pods do not contain only arg %s=%s", vkMachineryForge.CreateNode, strconv.FormatBool(*vn.Spec.CreateNode))
}
return fmt.Errorf("error checking virtual kubelet pods: %w", err)
}
}
if err := dr.deleteNode(ctx, node, vn); err != nil {
return fmt.Errorf("error deleting node: %w", err)
}
}

if !vn.DeletionTimestamp.IsZero() {
// VirtualNode resource is being deleted.
if err := dr.vnr.ensureNamespaceMapAbsence(ctx, vn); err != nil {
return fmt.Errorf("error deleting namespace map: %w", err)
}
err := dr.vnr.removeVirtualNodeFinalizer(ctx, vn)
if err != nil {
return fmt.Errorf("error removing finalizer: %w", err)
}
} else {
// Node is being deleted, but the VirtualNode resource is not.
// The VirtualNode .Spec.CreateNode field is set to false.
if err := UpdateCondition(ctx, dr.vnr.Client, vn,
VkConditionMap{
virtualkubeletv1alpha1.NodeConditionType: VkCondition{
Status: virtualkubeletv1alpha1.NoneConditionStatusType,
},
}); err != nil {
return fmt.Errorf("error updating condition: %w", err)
}
}

klog.Infof("Deletion routine completed for virtual node %s", vn.Name)
return nil
}

// deleteNode deletes the Node created by VirtualNode.
Expand Down Expand Up @@ -141,7 +192,6 @@ func (dr *DeletionRoutine) deleteNode(ctx context.Context, node *corev1.Node, vn
return fmt.Errorf("error deleting virtual kubelet deployment: %w", err)
}
}

klog.Infof("VirtualKubelet deployment %s deleted", vn.Name)

if err := UpdateCondition(ctx, dr.vnr.Client, vn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,38 @@ package virtualnodectrl
import (
"context"
"fmt"
"strings"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/utils/strings"
k8strings "k8s.io/utils/strings"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
"github.com/liqotech/liqo/pkg/utils/getters"
"github.com/liqotech/liqo/pkg/vkMachinery"
vkforge "github.com/liqotech/liqo/pkg/vkMachinery/forge"
)

// createVirtualKubeletDeployment creates the VirtualKubelet Deployment.
func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentPresence(
ctx context.Context, cl client.Client, virtualNode *virtualkubeletv1alpha1.VirtualNode) error {
var nodeStatusInitial virtualkubeletv1alpha1.VirtualNodeConditionStatusType
if *virtualNode.Spec.CreateNode {
nodeStatusInitial = virtualkubeletv1alpha1.CreatingConditionStatusType
} else {
nodeStatusInitial = virtualkubeletv1alpha1.NoneConditionStatusType
}
if err := UpdateCondition(ctx, cl, virtualNode,
VkConditionMap{
virtualkubeletv1alpha1.VirtualKubeletConditionType: VkCondition{
Status: virtualkubeletv1alpha1.CreatingConditionStatusType,
},
virtualkubeletv1alpha1.NodeConditionType: VkCondition{
Status: virtualkubeletv1alpha1.CreatingConditionStatusType,
},
virtualkubeletv1alpha1.NodeConditionType: VkCondition{Status: nodeStatusInitial},
},
); err != nil {
return err
Expand Down Expand Up @@ -105,12 +111,15 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentPresence(
}); err != nil {
return err
}
return UpdateCondition(ctx, cl, virtualNode,
VkConditionMap{
virtualkubeletv1alpha1.NodeConditionType: VkCondition{
Status: virtualkubeletv1alpha1.RunningConditionStatusType,
},
})
if *virtualNode.Spec.CreateNode {
return UpdateCondition(ctx, cl, virtualNode,
VkConditionMap{
virtualkubeletv1alpha1.NodeConditionType: VkCondition{
Status: virtualkubeletv1alpha1.RunningConditionStatusType,
},
})
}
return nil
}

// ensureVirtualKubeletDeploymentAbsence deletes the VirtualKubelet Deployment.
Expand All @@ -134,12 +143,12 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentAbsence(
return true, err
}

if ok, err := checkVirtualKubeletPodAbsence(ctx, r.Client, virtualNode, r.VirtualKubeletOptions); err != nil || !ok {
if ok, err := r.checkVirtualKubeletPodAbsence(ctx, virtualNode); err != nil || !ok {
return true, err
}

if err := r.Client.Delete(ctx, &rbacv1.ClusterRoleBinding{ObjectMeta: metav1.ObjectMeta{
Name: strings.ShortenString(fmt.Sprintf("%s%s", vkMachinery.CRBPrefix, virtualNode.Name), 253),
Name: k8strings.ShortenString(fmt.Sprintf("%s%s", vkMachinery.CRBPrefix, virtualNode.Name), 253),
}}); err != nil {
klog.Error(err)
return true, err
Expand Down Expand Up @@ -177,15 +186,36 @@ func (r *VirtualNodeReconciler) getVirtualKubeletDeployment(
return &deployList.Items[0], nil
}

func checkVirtualKubeletPodAbsence(ctx context.Context, cl client.Client,
vn *virtualkubeletv1alpha1.VirtualNode, vkopt *vkforge.VirtualKubeletOpts) (bool, error) {
func (r *VirtualNodeReconciler) checkVirtualKubeletPodAbsence(ctx context.Context,
vn *virtualkubeletv1alpha1.VirtualNode) (bool, error) {
klog.Warningf("[%v] checking virtual-kubelet pod absence", vn.Spec.ClusterIdentity.ClusterID)
list := &corev1.PodList{}
labels := vkforge.VirtualKubeletLabels(vn, vkopt)
err := cl.List(ctx, list, client.MatchingLabels(labels))
list, err := getters.ListVirtualKubeletPodsFromVirtualNode(ctx, r.Client, vn, r.VirtualKubeletOptions)
if err != nil {
klog.Error(err)
return false, nil
return false, err
}
return len(list.Items) == 0, err
}

func (r *VirtualNodeReconciler) checkVirtualKubeletPodsContainsOnlyArg(
ctx context.Context, vn *virtualkubeletv1alpha1.VirtualNode, flag, value string) (bool, error) {
list, err := getters.ListVirtualKubeletPodsFromVirtualNode(ctx, r.Client, vn, r.VirtualKubeletOptions)
if err != nil {
return false, err
}
for i := range list.Items {
for j := range list.Items[i].Spec.Containers {
if list.Items[i].Spec.Containers[j].Name != vkMachinery.ContainerName {
continue
}
for _, arg := range list.Items[i].Spec.Containers[j].Args {
if strings.HasPrefix(arg, flag) {
if fmt.Sprintf("--%s=%s", flag, value) != arg {
return false, nil
}
break
}
}
}
}
return true, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ func (r *VirtualNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
} else {
if ctrlutil.ContainsFinalizer(virtualNode, virtualNodeControllerFinalizer) {
r.dr.EnsureNodeAbsence(virtualNode)
return ctrl.Result{Requeue: true}, nil
// If the virtual-node is being deleted, it deletes the node and the virtual-node resource.
if err := r.dr.EnsureNodeAbsence(virtualNode); err != nil {
klog.Errorf(" %s --> Unable to delete the virtual-node", err)
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
Expand All @@ -120,7 +123,10 @@ func (r *VirtualNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}
if !*virtualNode.Spec.CreateNode {
r.dr.EnsureNodeAbsence(virtualNode)
// If the virtual-node is not enabled, it deletes the node but not the virtual-node resource.
if err := r.dr.EnsureNodeAbsence(virtualNode); err != nil {
klog.Errorf(" %s --> Unable to delete the node", err)
}
}

// If there is no NamespaceMap associated with this virtual-node, it creates a new one.
Expand Down
13 changes: 13 additions & 0 deletions pkg/utils/getters/k8sGetters.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1"
virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
"github.com/liqotech/liqo/pkg/consts"
vkforge "github.com/liqotech/liqo/pkg/vkMachinery/forge"
)

// GetIPAMStorageByLabel it returns a IPAMStorage instance that matches the given label selector.
Expand Down Expand Up @@ -312,3 +313,15 @@ func MapForeignClustersByLabel(ctx context.Context, cl client.Client,
}
return result, nil
}

// ListVirtualKubeletPodsFromVirtualNode returns the list of pods running a VirtualNode's VirtualKubelet.
func ListVirtualKubeletPodsFromVirtualNode(ctx context.Context, cl client.Client,
vn *virtualkubeletv1alpha1.VirtualNode, vkopt *vkforge.VirtualKubeletOpts) (*corev1.PodList, error) {
list := &corev1.PodList{}
vklabels := vkforge.VirtualKubeletLabels(vn, vkopt)
err := cl.List(ctx, list, client.MatchingLabels(vklabels))
if err != nil {
return nil, err
}
return list, nil
}
3 changes: 3 additions & 0 deletions pkg/vkMachinery/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const LocalClusterRoleName = "liqo-virtual-kubelet-local"
// ServiceAccountName -> the name of the service account leveraged by the virtual kubelet.
const ServiceAccountName = "virtual-kubelet"

// ContainerName -> the name of the container used to run the virtual kubelet.
const ContainerName = "virtual-kubelet"

// CRBPrefix -> the prefix used to create the virtual kubelet cluster role binding name.
const CRBPrefix = "liqo-node-"

Expand Down
2 changes: 1 addition & 1 deletion pkg/vkMachinery/forge/forge.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func forgeVKContainers(

return []v1.Container{
{
Name: "virtual-kubelet",
Name: vk.ContainerName,
Resources: pod.ForgeContainerResources(opts.RequestsCPU, opts.LimitsCPU, opts.RequestsRAM, opts.LimitsRAM),
Image: vkImage,
Command: command,
Expand Down

0 comments on commit f481cc2

Please sign in to comment.