Skip to content

Commit

Permalink
VirtualNode: deletion-routine refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 authored and adamjensenbot committed Sep 13, 2023
1 parent a632fee commit a930b4d
Show file tree
Hide file tree
Showing 12 changed files with 414 additions and 198 deletions.
46 changes: 40 additions & 6 deletions cmd/liqo-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ func main() {
reqRemoteLiqoPods, err := labels.NewRequirement(consts.ManagedByLabelKey, selection.Equals, []string{consts.ManagedByShadowPodValue})
utilruntime.Must(err)

// Create the main manager.
// This manager caches only the pods that are offloaded from a remote cluster and are scheduled on this.
mgr, err := ctrl.NewManager(config, ctrl.Options{
MapperProvider: mapper.LiqoMapperProvider(scheme),
Scheme: scheme,
Expand Down Expand Up @@ -257,7 +259,8 @@ func main() {
utilruntime.Must(err)

// Create an accessory manager that cache only local offloaded pods.
auxmgr, err := ctrl.NewManager(config, ctrl.Options{
// This manager caches only the pods that are offloaded and scheduled on a remote cluster.
auxmgrLocalPods, err := ctrl.NewManager(config, ctrl.Options{
MapperProvider: mapper.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: "0", // Disable the metrics of the auxiliary manager to prevent conflicts.
Expand All @@ -275,12 +278,42 @@ func main() {
klog.Errorf("Unable to create auxiliary manager: %w", err)
os.Exit(1)
}
if err := mgr.Add(auxmgr); err != nil {

// Create a label selector to filter only the events for virtual kubelet pods
reqVirtualKubeletPods, err := labels.NewRequirement(consts.K8sAppComponentKey, selection.Equals,
[]string{vkMachinery.KubeletBaseLabels[consts.K8sAppComponentKey]})
utilruntime.Must(err)

// Create an accessory manager that cache only local offloaded pods.
// This manager caches only virtual kubelet pods.
auxmgrVirtualKubeletPods, err := ctrl.NewManager(config, ctrl.Options{
MapperProvider: mapper.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: "0", // Disable the metrics of the auxiliary manager to prevent conflicts.
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.ByObject = map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Label: labels.NewSelector().Add(*reqVirtualKubeletPods),
},
}
return cache.New(config, opts)
},
})

if err != nil {
klog.Errorf("Unable to create auxiliary manager: %w", err)
os.Exit(1)
}

if err := mgr.Add(auxmgrLocalPods); err != nil {
klog.Errorf("Unable to add the auxiliary manager to the main one: %w", err)
os.Exit(1)
}

localPodsClient := auxmgr.GetClient()
if err := mgr.Add(auxmgrVirtualKubeletPods); err != nil {
klog.Errorf("Unable to add the auxiliary manager to the main one: %w", err)
os.Exit(1)
}

// Register the healthiness probes.
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
Expand All @@ -307,7 +340,7 @@ func main() {
os.Exit(1)
}

if err := indexer.IndexField(ctx, auxmgr, &corev1.Pod{}, indexer.FieldNodeNameFromPod, indexer.ExtractNodeName); err != nil {
if err := indexer.IndexField(ctx, auxmgrLocalPods, &corev1.Pod{}, indexer.FieldNodeNameFromPod, indexer.ExtractNodeName); err != nil {
klog.Errorf("Unable to setup the indexer for the Pod nodeName field: %v", err)
os.Exit(1)
}
Expand Down Expand Up @@ -394,7 +427,8 @@ func main() {
virtualNodeReconciler, err := virtualnodectrl.NewVirtualNodeReconciler(
ctx,
mgr.GetClient(),
auxmgr.GetClient(),
auxmgrLocalPods.GetClient(),
auxmgrVirtualKubeletPods.GetClient(),
mgr.GetScheme(),
mgr.GetEventRecorderFor("virtualnode-controller"),
&clusterIdentity,
Expand Down Expand Up @@ -485,7 +519,7 @@ func main() {
podStatusReconciler := &podstatusctrl.PodStatusReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
LocalPodsClient: localPodsClient,
LocalPodsClient: auxmgrLocalPods.GetClient(),
}
if err = podStatusReconciler.SetupWithManager(mgr); err != nil {
klog.Errorf("Unable to start the podstatus reconciler", err)
Expand Down
58 changes: 24 additions & 34 deletions pkg/liqo-controller-manager/virtualnode-controller/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,70 +15,60 @@
package virtualnodectrl

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
)

// VkConditionMap is a map of virtual node conditions.
type VkConditionMap map[virtualkubeletv1alpha1.VirtualNodeConditionType]VkCondition
// VnConditionMap is a map of virtual node conditions.
type VnConditionMap map[virtualkubeletv1alpha1.VirtualNodeConditionType]VnCondition

// VkCondition is a virtual node condition.
type VkCondition struct {
// VnCondition is a virtual node condition.
type VnCondition struct {
Status virtualkubeletv1alpha1.VirtualNodeConditionStatusType
Message string
}

// ForgeCondition forges a virtual node condition.
func ForgeCondition(
virtualNode *virtualkubeletv1alpha1.VirtualNode,
vkConditions VkConditionMap) (update bool) {
for nameCondition, vkCondition := range vkConditions {
vnConditions VnConditionMap) {
for nameCondition, vnCondition := range vnConditions {
for i := range virtualNode.Status.Conditions {
if virtualNode.Status.Conditions[i].Type != nameCondition {
continue
}
if virtualNode.Status.Conditions[i].Status == vkCondition.Status {
return false
if virtualNode.Status.Conditions[i].Status == vnCondition.Status {
return
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.RunningConditionStatusType) &&
(vkCondition.Status == virtualkubeletv1alpha1.CreatingConditionStatusType) {
return false
(vnCondition.Status == virtualkubeletv1alpha1.CreatingConditionStatusType) {
return
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.NoneConditionStatusType) &&
(vnCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType) {
return
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.NoneConditionStatusType) &&
(vnCondition.Status == virtualkubeletv1alpha1.DeletingConditionStatusType) {
return
}
if (virtualNode.Status.Conditions[i].Status == virtualkubeletv1alpha1.DeletingConditionStatusType) &&
vkCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType {
return false
vnCondition.Status == virtualkubeletv1alpha1.DrainingConditionStatusType {
return
}
virtualNode.Status.Conditions[i].Status = vkCondition.Status
virtualNode.Status.Conditions[i].Status = vnCondition.Status
virtualNode.Status.Conditions[i].LastTransitionTime = metav1.Now()
virtualNode.Status.Conditions[i].Message = vkCondition.Message
return true
virtualNode.Status.Conditions[i].Message = vnCondition.Message
}
virtualNode.Status.Conditions = append(virtualNode.Status.Conditions,
virtualkubeletv1alpha1.VirtualNodeCondition{
Type: nameCondition,
Status: vkCondition.Status,
Status: vnCondition.Status,
LastTransitionTime: metav1.Now(),
Message: vkCondition.Message,
Message: vnCondition.Message,
})
}
return true
}

// UpdateCondition updates the condition of the virtual node.
func UpdateCondition(ctx context.Context, cl client.Client,
virtualNode *virtualkubeletv1alpha1.VirtualNode,
vkConditions VkConditionMap,
) error {
if ForgeCondition(virtualNode, vkConditions) {
if err := cl.Status().Update(ctx, virtualNode); err != nil {
return err
}
}
return nil
}

// GetCondition returns the condition of the virtual node.
Expand Down
Loading

0 comments on commit a930b4d

Please sign in to comment.