Skip to content

Commit

Permalink
VirtualNode: deletion-routine refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Sep 11, 2023
1 parent 004a0cf commit b5ef0ed
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 131 deletions.
46 changes: 40 additions & 6 deletions cmd/liqo-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ func main() {
reqRemoteLiqoPods, err := labels.NewRequirement(consts.ManagedByLabelKey, selection.Equals, []string{consts.ManagedByShadowPodValue})
utilruntime.Must(err)

// Create the main manager.
// This manager caches only the pods that are offloaded from a remote cluster and are scheduled on this.
mgr, err := ctrl.NewManager(config, ctrl.Options{
MapperProvider: mapper.LiqoMapperProvider(scheme),
Scheme: scheme,
Expand Down Expand Up @@ -257,7 +259,8 @@ func main() {
utilruntime.Must(err)

// Create an accessory manager that cache only local offloaded pods.
auxmgr, err := ctrl.NewManager(config, ctrl.Options{
// This manager caches only the pods that are offloaded and scheduled on a remote cluster.
auxmgrLocalPods, err := ctrl.NewManager(config, ctrl.Options{
MapperProvider: mapper.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: "0", // Disable the metrics of the auxiliary manager to prevent conflicts.
Expand All @@ -275,12 +278,42 @@ func main() {
klog.Errorf("Unable to create auxiliary manager: %w", err)
os.Exit(1)
}
if err := mgr.Add(auxmgr); err != nil {

// Create a label selector to filter only the events for virtual kubelet pods
reqVirtualKubeletPods, err := labels.NewRequirement(consts.K8sAppComponentKey, selection.Equals,
[]string{vkMachinery.KubeletBaseLabels[consts.K8sAppComponentKey]})
utilruntime.Must(err)

// Create an accessory manager that cache only local offloaded pods.
// This manager caches only virtual kubelet pods.
auxmgrVirtualKubeletPods, err := ctrl.NewManager(config, ctrl.Options{
MapperProvider: mapper.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: "0", // Disable the metrics of the auxiliary manager to prevent conflicts.
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.ByObject = map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Label: labels.NewSelector().Add(*reqVirtualKubeletPods),
},
}
return cache.New(config, opts)
},
})

if err != nil {
klog.Errorf("Unable to create auxiliary manager: %w", err)
os.Exit(1)
}

if err := mgr.Add(auxmgrLocalPods); err != nil {
klog.Errorf("Unable to add the auxiliary manager to the main one: %w", err)
os.Exit(1)
}

localPodsClient := auxmgr.GetClient()
if err := mgr.Add(auxmgrVirtualKubeletPods); err != nil {
klog.Errorf("Unable to add the auxiliary manager to the main one: %w", err)
os.Exit(1)
}

// Register the healthiness probes.
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
Expand All @@ -307,7 +340,7 @@ func main() {
os.Exit(1)
}

if err := indexer.IndexField(ctx, auxmgr, &corev1.Pod{}, indexer.FieldNodeNameFromPod, indexer.ExtractNodeName); err != nil {
if err := indexer.IndexField(ctx, auxmgrLocalPods, &corev1.Pod{}, indexer.FieldNodeNameFromPod, indexer.ExtractNodeName); err != nil {
klog.Errorf("Unable to setup the indexer for the Pod nodeName field: %v", err)
os.Exit(1)
}
Expand Down Expand Up @@ -394,7 +427,8 @@ func main() {
virtualNodeReconciler, err := virtualnodectrl.NewVirtualNodeReconciler(
ctx,
mgr.GetClient(),
auxmgr.GetClient(),
auxmgrLocalPods.GetClient(),
auxmgrVirtualKubeletPods.GetClient(),
mgr.GetScheme(),
mgr.GetEventRecorderFor("virtualnode-controller"),
&clusterIdentity,
Expand Down Expand Up @@ -485,7 +519,7 @@ func main() {
podStatusReconciler := &podstatusctrl.PodStatusReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
LocalPodsClient: localPodsClient,
LocalPodsClient: auxmgrLocalPods.GetClient(),
}
if err = podStatusReconciler.SetupWithManager(mgr); err != nil {
klog.Errorf("Unable to start the podstatus reconciler", err)
Expand Down
38 changes: 23 additions & 15 deletions pkg/liqo-controller-manager/virtualnode-controller/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,54 @@ import (
virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
)

// VkConditionMap is a map of virtual node conditions.
type VkConditionMap map[virtualkubeletv1alpha1.VirtualNodeConditionType]VkCondition
// VnConditionMap is a map of virtual node conditions.
type VnConditionMap map[virtualkubeletv1alpha1.VirtualNodeConditionType]VnCondition

// VkCondition is a virtual node condition.
type VkCondition struct {
// VnCondition is a virtual node condition.
type VnCondition struct {
Status virtualkubeletv1alpha1.VirtualNodeConditionStatusType
Message string
}

// ForgeCondition forges a virtual node condition.
func ForgeCondition(
virtualNode *virtualkubeletv1alpha1.VirtualNode,
vkConditions VkConditionMap) (update bool) {
for nameCondition, vkCondition := range vkConditions {
vnConditions VnConditionMap) (update bool) {
for nameCondition, vnCondition := range vnConditions {
for i := range virtualNode.Status.Conditions {
if virtualNode.Status.Conditions[i].Type != nameCondition {
continue
}
if virtualNode.Status.Conditions[i].Status == vkCondition.Status {
if virtualNode.Status.Conditions[i].Status == vnCondition.Status {
return false
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.RunningConditionStatusType) &&
(vkCondition.Status == virtualkubeletv1alpha1.CreatingConditionStatusType) {
(vnCondition.Status == virtualkubeletv1alpha1.CreatingConditionStatusType) {
return false
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.NoneConditionStatusType) &&
(vnCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType) {
return false
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.NoneConditionStatusType) &&
(vnCondition.Status == virtualkubeletv1alpha1.DeletingConditionStatusType) {
return false
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.DeletingConditionStatusType) &&
vkCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType {
vnCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType {
return false
}
virtualNode.Status.Conditions[i].Status = vkCondition.Status
virtualNode.Status.Conditions[i].Status = vnCondition.Status
virtualNode.Status.Conditions[i].LastTransitionTime = metav1.Now()
virtualNode.Status.Conditions[i].Message = vkCondition.Message
virtualNode.Status.Conditions[i].Message = vnCondition.Message
return true
}
virtualNode.Status.Conditions = append(virtualNode.Status.Conditions,
virtualkubeletv1alpha1.VirtualNodeCondition{
Type: nameCondition,
Status: vkCondition.Status,
Status: vnCondition.Status,
LastTransitionTime: metav1.Now(),
Message: vkCondition.Message,
Message: vnCondition.Message,
})
}
return true
Expand All @@ -71,9 +79,9 @@ func ForgeCondition(
// UpdateCondition updates the condition of the virtual node.
func UpdateCondition(ctx context.Context, cl client.Client,
virtualNode *virtualkubeletv1alpha1.VirtualNode,
vkConditions VkConditionMap,
vnConditions VnConditionMap,
) error {
if ForgeCondition(virtualNode, vkConditions) {
if ForgeCondition(virtualNode, vnConditions) {
if err := cl.Status().Update(ctx, virtualNode); err != nil {
return err
}
Expand Down
163 changes: 111 additions & 52 deletions pkg/liqo-controller-manager/virtualnode-controller/deletion-routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,28 @@ package virtualnodectrl
import (
"context"
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
"github.com/liqotech/liqo/pkg/utils/getters"
vkMachineryForge "github.com/liqotech/liqo/pkg/vkMachinery/forge"
vkutils "github.com/liqotech/liqo/pkg/vkMachinery/utils"
)

var (
deletionRoutineRunning = false
createNodeFalseFlag = vkutils.Flag{
Name: string(vkMachineryForge.CreateNode),
Value: strconv.FormatBool(false),
}
)

// DeletionRoutine is responsible for deleting a virtual node.
Expand All @@ -53,64 +63,114 @@ func RunDeletionRoutine(ctx context.Context, r *VirtualNodeReconciler) (*Deletio
return dr, nil
}

// EnsureNodeAbsence adds a virtual node to the deletion queue.
func (dr *DeletionRoutine) EnsureNodeAbsence(vn *virtualkubeletv1alpha1.VirtualNode) error {
key, err := cache.MetaNamespaceKeyFunc(vn)
if err != nil {
return fmt.Errorf("error getting key: %w", err)
}
dr.wq.AddRateLimited(key)
return nil
}

func (dr *DeletionRoutine) run(ctx context.Context) {
for {
vni, _ := dr.wq.Get()
vn := vni.(*virtualkubeletv1alpha1.VirtualNode)
klog.Infof("Deletion routine started for virtual node %s", vn.Name)
if err := UpdateCondition(ctx, dr.vnr.Client, vn,
VkConditionMap{
virtualkubeletv1alpha1.NodeConditionType: VkCondition{
Status: virtualkubeletv1alpha1.DrainingConditionStatusType,
},
}); err != nil {
dr.reEnqueueVirtualNode(vn, fmt.Errorf("error updating condition: %w", err))
continue
}
defer klog.Error("Deletion routine stopped")
for dr.processNextItem(ctx) {
}
}

node, err := getters.GetNodeFromVirtualNode(ctx, dr.vnr.Client, vn)
if client.IgnoreNotFound(err) != nil {
dr.reEnqueueVirtualNode(vn, fmt.Errorf("error getting node: %w", err))
continue
}
func (dr *DeletionRoutine) processNextItem(ctx context.Context) bool {
var err error
key, quit := dr.wq.Get()
if quit {
return false
}
defer dr.wq.Done(key)

if node != nil {
if err := dr.deleteNode(ctx, node, vn); err != nil {
dr.reEnqueueVirtualNode(vn, fmt.Errorf("error deleting node: %w", err))
continue
}
ref, ok := key.(string)
if !ok {
klog.Errorf("expected string in workqueue but got %#v", key)
return true
}

if err = dr.handle(ctx, ref); err == nil {
dr.wq.Forget(key)
return true
}

klog.Errorf("error processing %q (will retry): %v", key, err)
dr.wq.AddRateLimited(key)
return true
}

func (dr *DeletionRoutine) handle(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("error splitting key: %w", err)
}
ref := types.NamespacedName{Namespace: namespace, Name: name}
vn := &virtualkubeletv1alpha1.VirtualNode{}
if err := dr.vnr.Client.Get(ctx, ref, vn); err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error getting virtual node: %w", err)
}
klog.Infof("Deletion routine started for virtual node %s", vn.Name)
if err := UpdateCondition(ctx, dr.vnr.Client, vn,
VnConditionMap{
virtualkubeletv1alpha1.NodeConditionType: VnCondition{
Status: virtualkubeletv1alpha1.DrainingConditionStatusType,
},
}); err != nil {
return fmt.Errorf("error updating condition: %w", err)
}

if !vn.DeletionTimestamp.IsZero() {
if err := dr.vnr.ensureNamespaceMapAbsence(ctx, vn); err != nil {
dr.reEnqueueVirtualNode(vn, fmt.Errorf("error deleting namespace map: %w", err))
continue
}
err := dr.vnr.removeVirtualNodeFinalizer(ctx, vn)
if err != nil {
dr.reEnqueueVirtualNode(vn, fmt.Errorf("error removing finalizer: %w", err))
continue
node, err := getters.GetNodeFromVirtualNode(ctx, dr.vnr.Client, vn)
if client.IgnoreNotFound(err) != nil {
return fmt.Errorf("error getting node: %w", err)
}

if node != nil {
if !*vn.Spec.CreateNode {
// We need to ensure that the current pods will no recreate the node after deleting it.
if found, err := vkutils.CheckVirtualKubeletFlagsConsistence(
ctx, dr.vnr.ClientVK, vn, dr.vnr.VirtualKubeletOptions, createNodeFalseFlag); err != nil || !found {
if err == nil {
err = fmt.Errorf("virtual kubelet pods are still running with arg %s", createNodeFalseFlag.String())
}
return fmt.Errorf("error checking virtual kubelet pods: %w", err)
}
}

klog.Infof("Deletion routine completed for virtual node %s", vn.Name)
dr.wq.Forget(vn)
dr.wq.Done(vn)
if err := dr.deleteNode(ctx, node, vn); err != nil {
return fmt.Errorf("error deleting node: %w", err)
}
}
}

// reEnqueueVirtualNode re-enqueues a virtual node in the deletion queue.
func (dr *DeletionRoutine) reEnqueueVirtualNode(vn *virtualkubeletv1alpha1.VirtualNode, err error) {
if err != nil {
klog.Error(err)
if !vn.DeletionTimestamp.IsZero() {
// VirtualNode resource is being deleted.
if err := dr.vnr.ensureNamespaceMapAbsence(ctx, vn); err != nil {
return fmt.Errorf("error deleting namespace map: %w", err)
}
err := dr.vnr.removeVirtualNodeFinalizer(ctx, vn)
if err != nil {
return fmt.Errorf("error removing finalizer: %w", err)
}
} else {
// Node is being deleted, but the VirtualNode resource is not.
// The VirtualNode .Spec.CreateNode field is set to false.
if err := UpdateCondition(ctx, dr.vnr.Client, vn,
VnConditionMap{
virtualkubeletv1alpha1.NodeConditionType: VnCondition{
Status: virtualkubeletv1alpha1.NoneConditionStatusType,
},
}); err != nil {
return fmt.Errorf("error updating condition: %w", err)
}
}
dr.wq.Done(vn)
dr.wq.AddRateLimited(vn)
}

// EnsureNodeAbsence adds a virtual node to the deletion queue.
func (dr *DeletionRoutine) EnsureNodeAbsence(vn *virtualkubeletv1alpha1.VirtualNode) {
dr.wq.AddRateLimited(vn)
klog.Infof("Deletion routine completed for virtual node %s", vn.Name)
return nil
}

// deleteNode deletes the Node created by VirtualNode.
Expand All @@ -129,8 +189,8 @@ func (dr *DeletionRoutine) deleteNode(ctx context.Context, node *corev1.Node, vn

if !vn.DeletionTimestamp.IsZero() {
if err := UpdateCondition(ctx, dr.vnr.Client, vn,
VkConditionMap{
virtualkubeletv1alpha1.VirtualKubeletConditionType: VkCondition{
VnConditionMap{
virtualkubeletv1alpha1.VirtualKubeletConditionType: VnCondition{
Status: virtualkubeletv1alpha1.DeletingConditionStatusType,
},
},
Expand All @@ -141,12 +201,11 @@ func (dr *DeletionRoutine) deleteNode(ctx context.Context, node *corev1.Node, vn
return fmt.Errorf("error deleting virtual kubelet deployment: %w", err)
}
}

klog.Infof("VirtualKubelet deployment %s deleted", vn.Name)

if err := UpdateCondition(ctx, dr.vnr.Client, vn,
VkConditionMap{
virtualkubeletv1alpha1.NodeConditionType: VkCondition{
VnConditionMap{
virtualkubeletv1alpha1.NodeConditionType: VnCondition{
Status: virtualkubeletv1alpha1.DeletingConditionStatusType,
},
}); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ func (r *VirtualNodeReconciler) ensureVirtualNodeFinalizerPresence(ctx context.C

func (r *VirtualNodeReconciler) removeVirtualNodeFinalizer(ctx context.Context, virtualNode *virtualkubeletv1alpha1.VirtualNode) error {
ctrlutil.RemoveFinalizer(virtualNode, virtualNodeControllerFinalizer)
klog.Infof("Removing finalizer %s from virtual-node %s", virtualNodeControllerFinalizer, virtualNode.Name)
return r.Client.Update(ctx, virtualNode)
}
Loading

0 comments on commit b5ef0ed

Please sign in to comment.