From a10afd9be2fa1b6048851ae4996604182d9effed Mon Sep 17 00:00:00 2001 From: Niraj Yadav Date: Thu, 13 Feb 2025 22:10:10 +0530 Subject: [PATCH] csiaddonsNode: Recreate CSIAddonsNode with active sidecar(s) This patch adds a watcher to csi-addons sidecar which is responsible for re-creating CSIAddonNode(s) in case they are deleted manually. Signed-off-by: Niraj Yadav --- .../internal/csiaddonsnode/csiaddonsnode.go | 96 ++++++++++++++++--- .../csiaddonsnode/csiaddonsnode_test.go | 2 +- sidecar/main.go | 16 +++- 3 files changed, 100 insertions(+), 14 deletions(-) diff --git a/sidecar/internal/csiaddonsnode/csiaddonsnode.go b/sidecar/internal/csiaddonsnode/csiaddonsnode.go index cc7ff7354..1a0a18805 100644 --- a/sidecar/internal/csiaddonsnode/csiaddonsnode.go +++ b/sidecar/internal/csiaddonsnode/csiaddonsnode.go @@ -30,8 +30,11 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -45,6 +48,11 @@ const ( nodeCreationRetry = time.Minute * 5 // nodeCreationTimeout is the time after which the context for node creation request is cancelled. nodeCreationTimeout = time.Minute * 3 + + // watcherRetryCount is the number of times a watcher creation would be retried for. + watcherRetryCount = 10 + // watcherRetryDelay is the amount of time to wait before trying recreation of a watcher. + watcherRetryDelay = time.Minute ) var ( @@ -85,15 +93,9 @@ type Manager struct { } // Deploy creates CSIAddonsNode custom resource with all required information. -// When information to create the CSIAddonsNode is missing, an error will be -// returned immediately. If creating the CSIAddonsNode in the Kubernetes -// cluster fails (missing CRD, RBAC limitations, ...), an error will be logged, -// and creation will be retried. -func (mgr *Manager) Deploy() error { - object, err := mgr.getCSIAddonsNode() - if err != nil { - return fmt.Errorf("failed to get csiaddonsNode object: %w", err) - } +// If creating the CSIAddonsNode in the Kubernetes cluster fails (missing CRD, RBAC limitations, ...) +// an error will be logged and creation will be retried. +func (mgr *Manager) Deploy(object *csiaddonsv1alpha1.CSIAddonsNode) error { // loop until the CSIAddonsNode has been created return wait.PollUntilContextTimeout(context.TODO(), nodeCreationRetry, nodeCreationTimeout, true, func(ctx context.Context) (bool, error) { @@ -133,6 +135,10 @@ func (mgr *Manager) newCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) erro }, } _, err = controllerutil.CreateOrUpdate(ctx, cli, csiaddonNode, func() error { + if !csiaddonNode.DeletionTimestamp.IsZero() { + return errors.New("csiaddonnode is being deleted") + } + // update the resourceVersion resourceVersion := csiaddonNode.ResourceVersion if resourceVersion != "" { @@ -151,8 +157,8 @@ func (mgr *Manager) newCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) erro return nil } -// getCSIAddonsNode fills required information and return CSIAddonsNode object. -func (mgr *Manager) getCSIAddonsNode() (*csiaddonsv1alpha1.CSIAddonsNode, error) { +// GetCSIAddonsNode fills required information and return CSIAddonsNode object. +func (mgr *Manager) GetCSIAddonsNode() (*csiaddonsv1alpha1.CSIAddonsNode, error) { if mgr.PodName == "" { return nil, fmt.Errorf("%w: missing Pod name", errInvalidConfig) } @@ -275,3 +281,71 @@ func generateName(nodeID, namespace, ownerKind, ownerName string) (string, error return base, nil } + +// watchCSIAddonsNode starts a watcher for a specific CSIAddonsNode resource identified by its name. +// If a CSIAddonsNode is deleted, it logs a warning and attempts to recreate it using mgr.Deploy() +func (mgr *Manager) watchCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) error { + // Call deploy on start, this takes care of the cases where + // a watcher might exit due to an error while trying to + // recreate CSIAddonsNode in the cluster + err := mgr.Deploy(node) + if err != nil { + klog.Fatalf("Failed to create csiaddonsnode: %v", err) + } + + klog.Infof("Starting watcher for CSIAddonsNode: %s", node.Name) + + dynamicClient, err := dynamic.NewForConfig(mgr.Config) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + gvr := schema.GroupVersionResource{ + Group: csiaddonsv1alpha1.GroupVersion.Group, + Version: csiaddonsv1alpha1.GroupVersion.Version, + Resource: "csiaddonsnodes", + } + + watcher, err := dynamicClient.Resource(gvr).Namespace(mgr.PodNamespace).Watch(context.Background(), v1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", node.Name), + }) + if err != nil { + return fmt.Errorf("failed to watch CSIAddonsNode objects: %w", err) + } + defer watcher.Stop() + + for event := range watcher.ResultChan() { + switch event.Type { + case watch.Deleted: + klog.Infof("WARNING: An active CSIAddonsNode: %s was deleted, it will be recreated", node.Name) + + err := mgr.Deploy(node) + if err != nil { + return fmt.Errorf("failed to recreate CSIAddonsNode: %w", err) + } + klog.Infof("CSIAddonsNode: %s recreated", node.Name) + } + } + + return nil +} + +// DispatchWatcher starts a watcher for the specified CSIAddonsNode and retries +// if the watcher exits due to an error. It will retry up to a maximum number of +// attempts defined by watcherRetryCount before returning an error. +func (mgr *Manager) DispatchWatcher(node *csiaddonsv1alpha1.CSIAddonsNode) error { + retryCount := 0 + + for retryCount < int(watcherRetryCount) { + err := mgr.watchCSIAddonsNode(node) + if err != nil { + klog.Errorf("Watcher for %s exited, retrying (%d/%d), error: %v", node.Name, retryCount+1, watcherRetryCount, err) + + retryCount++ + time.Sleep(watcherRetryDelay) + } else { + retryCount = 0 + } + } + return fmt.Errorf("watcher for %s reached max retries, giving up", node.Name) +} diff --git a/sidecar/internal/csiaddonsnode/csiaddonsnode_test.go b/sidecar/internal/csiaddonsnode/csiaddonsnode_test.go index 199e04ea2..581a51690 100644 --- a/sidecar/internal/csiaddonsnode/csiaddonsnode_test.go +++ b/sidecar/internal/csiaddonsnode/csiaddonsnode_test.go @@ -130,7 +130,7 @@ func Test_getCSIAddonsNode(t *testing.T) { mgr.Endpoint = tt.args.endpoint mgr.Node = tt.args.nodeID - got, err := mgr.getCSIAddonsNode() + got, err := mgr.GetCSIAddonsNode() if (err != nil) != tt.wantErr { t.Errorf("getCSIAddonsNode() error = %v, wantErr %v", err, tt.wantErr) } diff --git a/sidecar/main.go b/sidecar/main.go index 2fdc3e6d9..0f0ce89c1 100644 --- a/sidecar/main.go +++ b/sidecar/main.go @@ -19,6 +19,7 @@ package main import ( "context" "flag" + "os" "time" "github.com/csi-addons/kubernetes-csi-addons/internal/sidecar/service" @@ -106,11 +107,22 @@ func main() { PodNamespace: *podNamespace, PodUID: *podUID, } - err = nodeMgr.Deploy() + + nodeObj, err := nodeMgr.GetCSIAddonsNode() if err != nil { - klog.Fatalf("Failed to create csiaddonsnode: %v", err) + klog.Fatalf("failed to get csiaddonsNode object: %v", err) } + // Start the watcher, it calls nodeMgr.Deploy() internally + go func() { + err := nodeMgr.DispatchWatcher(nodeObj) + if err != nil { + klog.Errorf("Watcher for %s failed due to error: %v", nodeObj.Name, err) + + os.Exit(1) + } + }() + sidecarServer := server.NewSidecarServer(*controllerIP, *controllerPort, kubeClient, *enableAuthChecks) sidecarServer.RegisterService(service.NewIdentityServer(csiClient.GetGRPCClient())) sidecarServer.RegisterService(service.NewReclaimSpaceServer(csiClient.GetGRPCClient(), kubeClient, *stagingPath))