Skip to content

Commit

Permalink
csiaddonsNode: Recreate CSIAddonsNode with active sidecar(s)
Browse files Browse the repository at this point in the history
This patch adds a watcher to csi-addons sidecar which is
responsible for re-creating CSIAddonNode(s) in case they
are deleted manually.

Signed-off-by: Niraj Yadav <[email protected]>
  • Loading branch information
black-dragon74 committed Feb 14, 2025
1 parent 509ea1a commit a10afd9
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 14 deletions.
96 changes: 85 additions & 11 deletions sidecar/internal/csiaddonsnode/csiaddonsnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import (

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand All @@ -45,6 +48,11 @@ const (
nodeCreationRetry = time.Minute * 5
// nodeCreationTimeout is the time after which the context for node creation request is cancelled.
nodeCreationTimeout = time.Minute * 3

// watcherRetryCount is the number of times a watcher creation would be retried for.
watcherRetryCount = 10
// watcherRetryDelay is the amount of time to wait before trying recreation of a watcher.
watcherRetryDelay = time.Minute
)

var (
Expand Down Expand Up @@ -85,15 +93,9 @@ type Manager struct {
}

// Deploy creates CSIAddonsNode custom resource with all required information.
// When information to create the CSIAddonsNode is missing, an error will be
// returned immediately. If creating the CSIAddonsNode in the Kubernetes
// cluster fails (missing CRD, RBAC limitations, ...), an error will be logged,
// and creation will be retried.
func (mgr *Manager) Deploy() error {
object, err := mgr.getCSIAddonsNode()
if err != nil {
return fmt.Errorf("failed to get csiaddonsNode object: %w", err)
}
// If creating the CSIAddonsNode in the Kubernetes cluster fails (missing CRD, RBAC limitations, ...)
// an error will be logged and creation will be retried.
func (mgr *Manager) Deploy(object *csiaddonsv1alpha1.CSIAddonsNode) error {

// loop until the CSIAddonsNode has been created
return wait.PollUntilContextTimeout(context.TODO(), nodeCreationRetry, nodeCreationTimeout, true, func(ctx context.Context) (bool, error) {
Expand Down Expand Up @@ -133,6 +135,10 @@ func (mgr *Manager) newCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) erro
},
}
_, err = controllerutil.CreateOrUpdate(ctx, cli, csiaddonNode, func() error {
if !csiaddonNode.DeletionTimestamp.IsZero() {
return errors.New("csiaddonnode is being deleted")
}

// update the resourceVersion
resourceVersion := csiaddonNode.ResourceVersion
if resourceVersion != "" {
Expand All @@ -151,8 +157,8 @@ func (mgr *Manager) newCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) erro
return nil
}

// getCSIAddonsNode fills required information and return CSIAddonsNode object.
func (mgr *Manager) getCSIAddonsNode() (*csiaddonsv1alpha1.CSIAddonsNode, error) {
// GetCSIAddonsNode fills required information and return CSIAddonsNode object.
func (mgr *Manager) GetCSIAddonsNode() (*csiaddonsv1alpha1.CSIAddonsNode, error) {
if mgr.PodName == "" {
return nil, fmt.Errorf("%w: missing Pod name", errInvalidConfig)
}
Expand Down Expand Up @@ -275,3 +281,71 @@ func generateName(nodeID, namespace, ownerKind, ownerName string) (string, error

return base, nil
}

// watchCSIAddonsNode starts a watcher for a specific CSIAddonsNode resource identified by its name.
// If a CSIAddonsNode is deleted, it logs a warning and attempts to recreate it using mgr.Deploy()
func (mgr *Manager) watchCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) error {
// Call deploy on start, this takes care of the cases where
// a watcher might exit due to an error while trying to
// recreate CSIAddonsNode in the cluster
err := mgr.Deploy(node)
if err != nil {
klog.Fatalf("Failed to create csiaddonsnode: %v", err)
}

klog.Infof("Starting watcher for CSIAddonsNode: %s", node.Name)

dynamicClient, err := dynamic.NewForConfig(mgr.Config)
if err != nil {
return fmt.Errorf("failed to create dynamic client: %w", err)
}

gvr := schema.GroupVersionResource{
Group: csiaddonsv1alpha1.GroupVersion.Group,
Version: csiaddonsv1alpha1.GroupVersion.Version,
Resource: "csiaddonsnodes",
}

watcher, err := dynamicClient.Resource(gvr).Namespace(mgr.PodNamespace).Watch(context.Background(), v1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", node.Name),
})
if err != nil {
return fmt.Errorf("failed to watch CSIAddonsNode objects: %w", err)
}
defer watcher.Stop()

for event := range watcher.ResultChan() {
switch event.Type {
case watch.Deleted:
klog.Infof("WARNING: An active CSIAddonsNode: %s was deleted, it will be recreated", node.Name)

err := mgr.Deploy(node)
if err != nil {
return fmt.Errorf("failed to recreate CSIAddonsNode: %w", err)
}
klog.Infof("CSIAddonsNode: %s recreated", node.Name)
}
}

return nil
}

// DispatchWatcher starts a watcher for the specified CSIAddonsNode and retries
// if the watcher exits due to an error. It will retry up to a maximum number of
// attempts defined by watcherRetryCount before returning an error.
func (mgr *Manager) DispatchWatcher(node *csiaddonsv1alpha1.CSIAddonsNode) error {
retryCount := 0

for retryCount < int(watcherRetryCount) {
err := mgr.watchCSIAddonsNode(node)
if err != nil {
klog.Errorf("Watcher for %s exited, retrying (%d/%d), error: %v", node.Name, retryCount+1, watcherRetryCount, err)

retryCount++
time.Sleep(watcherRetryDelay)
} else {
retryCount = 0
}
}
return fmt.Errorf("watcher for %s reached max retries, giving up", node.Name)
}
2 changes: 1 addition & 1 deletion sidecar/internal/csiaddonsnode/csiaddonsnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func Test_getCSIAddonsNode(t *testing.T) {
mgr.Endpoint = tt.args.endpoint
mgr.Node = tt.args.nodeID

got, err := mgr.getCSIAddonsNode()
got, err := mgr.GetCSIAddonsNode()
if (err != nil) != tt.wantErr {
t.Errorf("getCSIAddonsNode() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down
16 changes: 14 additions & 2 deletions sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"flag"
"os"
"time"

"github.com/csi-addons/kubernetes-csi-addons/internal/sidecar/service"
Expand Down Expand Up @@ -106,11 +107,22 @@ func main() {
PodNamespace: *podNamespace,
PodUID: *podUID,
}
err = nodeMgr.Deploy()

nodeObj, err := nodeMgr.GetCSIAddonsNode()
if err != nil {
klog.Fatalf("Failed to create csiaddonsnode: %v", err)
klog.Fatalf("failed to get csiaddonsNode object: %v", err)
}

// Start the watcher, it calls nodeMgr.Deploy() internally
go func() {
err := nodeMgr.DispatchWatcher(nodeObj)
if err != nil {
klog.Errorf("Watcher for %s failed due to error: %v", nodeObj.Name, err)

os.Exit(1)
}
}()

sidecarServer := server.NewSidecarServer(*controllerIP, *controllerPort, kubeClient, *enableAuthChecks)
sidecarServer.RegisterService(service.NewIdentityServer(csiClient.GetGRPCClient()))
sidecarServer.RegisterService(service.NewReclaimSpaceServer(csiClient.GetGRPCClient(), kubeClient, *stagingPath))
Expand Down

0 comments on commit a10afd9

Please sign in to comment.