Skip to content

Commit

Permalink
New auxmanager and client for vk pods
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Sep 11, 2023
1 parent a6adfe0 commit ecb9313
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 73 deletions.
46 changes: 40 additions & 6 deletions cmd/liqo-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ func main() {
reqRemoteLiqoPods, err := labels.NewRequirement(consts.ManagedByLabelKey, selection.Equals, []string{consts.ManagedByShadowPodValue})
utilruntime.Must(err)

// Create the main manager.
// This manager caches only the pods that are offloaded from a remote cluster and are scheduled on this.
mgr, err := ctrl.NewManager(config, ctrl.Options{
MapperProvider: mapper.LiqoMapperProvider(scheme),
Scheme: scheme,
Expand Down Expand Up @@ -257,7 +259,8 @@ func main() {
utilruntime.Must(err)

// Create an accessory manager that cache only local offloaded pods.
auxmgr, err := ctrl.NewManager(config, ctrl.Options{
// This manager caches only the pods that are offloaded and scheduled on a remote cluster.
auxmgrLocalPods, err := ctrl.NewManager(config, ctrl.Options{
MapperProvider: mapper.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: "0", // Disable the metrics of the auxiliary manager to prevent conflicts.
Expand All @@ -275,12 +278,42 @@ func main() {
klog.Errorf("Unable to create auxiliary manager: %w", err)
os.Exit(1)
}
if err := mgr.Add(auxmgr); err != nil {

// Create a label selector to filter only the events for virtual kubelet pods
reqVirtualKubeletPods, err := labels.NewRequirement(consts.K8sAppComponentKey, selection.Equals,
[]string{vkMachinery.KubeletBaseLabels[consts.K8sAppComponentKey]})
utilruntime.Must(err)

// Create an accessory manager that cache only local offloaded pods.
// This manager caches only virtual kubelet pods.
auxmgrVirtualKubeletPods, err := ctrl.NewManager(config, ctrl.Options{
MapperProvider: mapper.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: "0", // Disable the metrics of the auxiliary manager to prevent conflicts.
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.ByObject = map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Label: labels.NewSelector().Add(*reqVirtualKubeletPods),
},
}
return cache.New(config, opts)
},
})

if err != nil {
klog.Errorf("Unable to create auxiliary manager: %w", err)
os.Exit(1)
}

if err := mgr.Add(auxmgrLocalPods); err != nil {
klog.Errorf("Unable to add the auxiliary manager to the main one: %w", err)
os.Exit(1)
}

localPodsClient := auxmgr.GetClient()
if err := mgr.Add(auxmgrVirtualKubeletPods); err != nil {
klog.Errorf("Unable to add the auxiliary manager to the main one: %w", err)
os.Exit(1)
}

// Register the healthiness probes.
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
Expand All @@ -307,7 +340,7 @@ func main() {
os.Exit(1)
}

if err := indexer.IndexField(ctx, auxmgr, &corev1.Pod{}, indexer.FieldNodeNameFromPod, indexer.ExtractNodeName); err != nil {
if err := indexer.IndexField(ctx, auxmgrLocalPods, &corev1.Pod{}, indexer.FieldNodeNameFromPod, indexer.ExtractNodeName); err != nil {
klog.Errorf("Unable to setup the indexer for the Pod nodeName field: %v", err)
os.Exit(1)
}
Expand Down Expand Up @@ -394,7 +427,8 @@ func main() {
virtualNodeReconciler, err := virtualnodectrl.NewVirtualNodeReconciler(
ctx,
mgr.GetClient(),
auxmgr.GetClient(),
auxmgrLocalPods.GetClient(),
auxmgrVirtualKubeletPods.GetClient(),
mgr.GetScheme(),
mgr.GetEventRecorderFor("virtualnode-controller"),
&clusterIdentity,
Expand Down Expand Up @@ -485,7 +519,7 @@ func main() {
podStatusReconciler := &podstatusctrl.PodStatusReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
LocalPodsClient: localPodsClient,
LocalPodsClient: auxmgrLocalPods.GetClient(),
}
if err = podStatusReconciler.SetupWithManager(mgr); err != nil {
klog.Errorf("Unable to start the podstatus reconciler", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"

corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand All @@ -31,10 +32,15 @@ import (
virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
"github.com/liqotech/liqo/pkg/utils/getters"
vkMachineryForge "github.com/liqotech/liqo/pkg/vkMachinery/forge"
vkutils "github.com/liqotech/liqo/pkg/vkMachinery/utils"
)

var (
deletionRoutineRunning = false
createNodeFalseFlag = vkutils.Flag{
Name: string(vkMachineryForge.CreateNode),
Value: strconv.FormatBool(false),
}
)

// DeletionRoutine is responsible for deleting a virtual node.
Expand Down Expand Up @@ -68,7 +74,7 @@ func (dr *DeletionRoutine) EnsureNodeAbsence(vn *virtualkubeletv1alpha1.VirtualN
}

func (dr *DeletionRoutine) run(ctx context.Context) {
defer klog.Fatal("Deletion routine stopped")
defer klog.Error("Deletion routine stopped")
for dr.processNextItem(ctx) {
}
}
Expand Down Expand Up @@ -127,11 +133,11 @@ func (dr *DeletionRoutine) handle(ctx context.Context, key string) error {

if node != nil {
if !*vn.Spec.CreateNode {
// We need to ensure that the current pods will no recreate the node before deleting it.
if found, err := dr.vnr.checkVirtualKubeletPodsContainsOnlyArg(
ctx, vn, string(vkMachineryForge.CreateNode), strconv.FormatBool(*vn.Spec.CreateNode)); err != nil || !found {
// We need to ensure that the current pods will no recreate the node after deleting it.
if found, err := vkutils.CheckVirtualKubeletFlagsConsistence(
ctx, dr.vnr.ClientVK, vn, dr.vnr.VirtualKubeletOptions, createNodeFalseFlag); err != nil || !found {
if err == nil {
err = fmt.Errorf("virtual kubelet pods do not contain only arg %s=%s", vkMachineryForge.CreateNode, strconv.FormatBool(*vn.Spec.CreateNode))
err = fmt.Errorf("virtual kubelet pods are still running with arg %s", createNodeFalseFlag.String())
}
return fmt.Errorf("error checking virtual kubelet pods: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ var _ = BeforeSuite(func() {
Expect(k8sClient).ToNot(BeNil())

vnr, err := NewVirtualNodeReconciler(ctx,
k8sClient,
k8sClient,
k8sClient,
scheme.Scheme,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package virtualnodectrl
import (
"context"
"fmt"
"strings"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -29,9 +28,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
"github.com/liqotech/liqo/pkg/utils/getters"
"github.com/liqotech/liqo/pkg/vkMachinery"
vkforge "github.com/liqotech/liqo/pkg/vkMachinery/forge"
vkutils "github.com/liqotech/liqo/pkg/vkMachinery/utils"
)

// createVirtualKubeletDeployment creates the VirtualKubelet Deployment.
Expand Down Expand Up @@ -125,7 +124,7 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentPresence(
// ensureVirtualKubeletDeploymentAbsence deletes the VirtualKubelet Deployment.
func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentAbsence(
ctx context.Context, virtualNode *virtualkubeletv1alpha1.VirtualNode) (reEnque bool, err error) {
virtualKubeletDeployment, err := r.getVirtualKubeletDeployment(ctx, virtualNode)
virtualKubeletDeployment, err := vkutils.GetVirtualKubeletDeployment(ctx, r.Client, virtualNode, r.VirtualKubeletOptions)
if err != nil {
klog.Error(err)
return true, err
Expand All @@ -143,7 +142,7 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentAbsence(
return true, err
}

if ok, err := r.checkVirtualKubeletPodAbsence(ctx, virtualNode); err != nil || !ok {
if ok, err := vkutils.CheckVirtualKubeletPodAbsence(ctx, r.ClientVK, virtualNode, r.VirtualKubeletOptions); err != nil || !ok {
return true, err
}

Expand All @@ -163,59 +162,3 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentAbsence(

return false, nil
}

// getVirtualKubeletDeployment returns the VirtualKubelet Deployment given a VirtualNode.
func (r *VirtualNodeReconciler) getVirtualKubeletDeployment(
ctx context.Context, virtualNode *virtualkubeletv1alpha1.VirtualNode) (*appsv1.Deployment, error) {
var deployList appsv1.DeploymentList
labels := vkforge.VirtualKubeletLabels(virtualNode, r.VirtualKubeletOptions)
if err := r.Client.List(ctx, &deployList, client.MatchingLabels(labels)); err != nil {
klog.Error(err)
return nil, err
}

if len(deployList.Items) == 0 {
klog.V(4).Infof("[%v] no VirtualKubelet deployment found", virtualNode.Spec.ClusterIdentity.ClusterID)
return nil, nil
} else if len(deployList.Items) > 1 {
err := fmt.Errorf("[%v] more than one VirtualKubelet deployment found", virtualNode.Spec.ClusterIdentity.ClusterID)
klog.Error(err)
return nil, err
}

return &deployList.Items[0], nil
}

func (r *VirtualNodeReconciler) checkVirtualKubeletPodAbsence(ctx context.Context,
vn *virtualkubeletv1alpha1.VirtualNode) (bool, error) {
klog.Warningf("[%v] checking virtual-kubelet pod absence", vn.Spec.ClusterIdentity.ClusterID)
list, err := getters.ListVirtualKubeletPodsFromVirtualNode(ctx, r.Client, vn, r.VirtualKubeletOptions)
if err != nil {
return false, err
}
return len(list.Items) == 0, err
}

func (r *VirtualNodeReconciler) checkVirtualKubeletPodsContainsOnlyArg(
ctx context.Context, vn *virtualkubeletv1alpha1.VirtualNode, flag, value string) (bool, error) {
list, err := getters.ListVirtualKubeletPodsFromVirtualNode(ctx, r.Client, vn, r.VirtualKubeletOptions)
if err != nil {
return false, err
}
for i := range list.Items {
for j := range list.Items[i].Spec.Containers {
if list.Items[i].Spec.Containers[j].Name != vkMachinery.ContainerName {
continue
}
for _, arg := range list.Items[i].Spec.Containers[j].Args {
if strings.HasPrefix(arg, flag) {
if fmt.Sprintf("--%s=%s", flag, value) != arg {
return false, nil
}
break
}
}
}
}
return true, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ const (
// VirtualNodeReconciler manage NamespaceMap lifecycle.
type VirtualNodeReconciler struct {
client.Client
ClientLocal client.Client
// Client used to list local pods
ClientLocal client.Client
// Client used to list virtual-kubelet pods
ClientVK client.Client
Scheme *runtime.Scheme
HomeClusterIdentity *discoveryv1alpha1.ClusterIdentity
VirtualKubeletOptions *vkforge.VirtualKubeletOpts
Expand All @@ -60,13 +63,14 @@ type VirtualNodeReconciler struct {
// NewVirtualNodeReconciler returns a new VirtualNodeReconciler.
func NewVirtualNodeReconciler(
ctx context.Context,
cl client.Client, cll client.Client,
cl client.Client, cll client.Client, clvk client.Client,
s *runtime.Scheme, er record.EventRecorder,
hci *discoveryv1alpha1.ClusterIdentity, vko *vkforge.VirtualKubeletOpts,
) (*VirtualNodeReconciler, error) {
vnr := &VirtualNodeReconciler{
Client: cl,
ClientLocal: cll,
ClientVK: clvk,
Scheme: s,
HomeClusterIdentity: hci,
VirtualKubeletOptions: vko,
Expand Down
16 changes: 16 additions & 0 deletions pkg/vkMachinery/utils/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2019-2023 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package utils contains some utility functions used by the virtual node controller.
package utils
106 changes: 106 additions & 0 deletions pkg/vkMachinery/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2019-2023 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"context"
"fmt"
"strings"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
"github.com/liqotech/liqo/pkg/utils/getters"
"github.com/liqotech/liqo/pkg/vkMachinery"
vkforge "github.com/liqotech/liqo/pkg/vkMachinery/forge"
)

// GetVirtualKubeletDeployment returns the VirtualKubelet Deployment of a VirtualNode.
func GetVirtualKubeletDeployment(
ctx context.Context, cl client.Client, virtualNode *virtualkubeletv1alpha1.VirtualNode,
vkopts *vkforge.VirtualKubeletOpts) (*appsv1.Deployment, error) {
var deployList appsv1.DeploymentList
labels := vkforge.VirtualKubeletLabels(virtualNode, vkopts)
if err := cl.List(ctx, &deployList, client.MatchingLabels(labels)); err != nil {
klog.Error(err)
return nil, err
}

if len(deployList.Items) == 0 {
klog.V(4).Infof("[%v] no VirtualKubelet deployment found", virtualNode.Spec.ClusterIdentity.ClusterID)
return nil, nil
} else if len(deployList.Items) > 1 {
err := fmt.Errorf("[%v] more than one VirtualKubelet deployment found", virtualNode.Spec.ClusterIdentity.ClusterID)
klog.Error(err)
return nil, err
}

return &deployList.Items[0], nil
}

// CheckVirtualKubeletPodAbsence checks if a VirtualNode's VirtualKubelet pods are absent.
func CheckVirtualKubeletPodAbsence(ctx context.Context, cl client.Client,
vn *virtualkubeletv1alpha1.VirtualNode, vkopts *vkforge.VirtualKubeletOpts) (bool, error) {
klog.V(4).Infof("[%v] checking virtual-kubelet pod absence", vn.Spec.ClusterIdentity.ClusterName)
list, err := getters.ListVirtualKubeletPodsFromVirtualNode(ctx, cl, vn, vkopts)
if err != nil {
return false, err
}
klog.V(4).Infof("[%v] found %v virtual-kubelet pods", vn.Spec.ClusterIdentity.ClusterName, len(list.Items))
return len(list.Items) == 0, err
}

// Flag is a VirtualKubelet flag.
// Name must contain the "--" prefix.
type Flag struct {
Name string
Value string
}

// String returns the flag as a string.
func (f Flag) String() string {
return fmt.Sprintf("%s=%s", f.Name, f.Value)
}

// CheckVirtualKubeletFlagsConsistence checks if the VirtualKubelet args are consistent with the flag list provided.
// It returns true if all the flags are consistent, false otherwise.
// A flag is not consistent if it is present in the VirtualKubelet args with a different value.
func CheckVirtualKubeletFlagsConsistence(
ctx context.Context, cl client.Client, vn *virtualkubeletv1alpha1.VirtualNode, vkopts *vkforge.VirtualKubeletOpts, flags ...Flag) (bool, error) {
list, err := getters.ListVirtualKubeletPodsFromVirtualNode(ctx, cl, vn, vkopts)
if err != nil {
return false, err
}
for i := range list.Items {
for j := range list.Items[i].Spec.Containers {
if list.Items[i].Spec.Containers[j].Name != vkMachinery.ContainerName {
continue
}
for _, arg := range list.Items[i].Spec.Containers[j].Args {
for _, flag := range flags {
if strings.HasPrefix(arg, flag.Name) {
if flag.String() != arg {
return false, nil
}
break
}
}
}
}
}
return true, nil
}

0 comments on commit ecb9313

Please sign in to comment.