Skip to content

Commit

Permalink
csiaddonsNode: Recreate CSIAddonsNode with active sidecar(s)
Browse files Browse the repository at this point in the history
This patch adds a watcher to csi-addons sidecar which is
responsible for re-creating CSIAddonNode(s) in case they
are deleted manually.

Signed-off-by: Niraj Yadav <[email protected]>
  • Loading branch information
black-dragon74 committed Feb 14, 2025
1 parent 509ea1a commit a1ef307
Showing 1 changed file with 87 additions and 1 deletion.
88 changes: 87 additions & 1 deletion sidecar/internal/csiaddonsnode/csiaddonsnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1"
"github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/client"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand All @@ -45,12 +49,23 @@ const (
nodeCreationRetry = time.Minute * 5
// nodeCreationTimeout is the time after which the context for node creation request is cancelled.
nodeCreationTimeout = time.Minute * 3

// watcherRetryCount is the number of times a watcher creation would be retried for.
watcherRetryCount = 5
// watcherRetryDelay is the amount of time to wait before trying recreation of a watcher.
watcherRetryDelay = time.Minute //FIXME: Reduce the delay befor pushing
)

var (
// errInvalidConfig is returned when an invalid configuration setting
// is detected.
errInvalidConfig = errors.New("invalid configuration")

// watcherMutex is for thread safe access of nodeWatchers map
watcherMutex = &sync.Mutex{}
// nodeWatchers map ensures that duplicate watchers do not exist for the same resource.
// Use with `watcherMuxtex` for thread safe access
nodeWatchers = make(map[string]bool)
)

// Manager is a helper that creates the CSIAddonsNode for the running sidecar.
Expand Down Expand Up @@ -133,8 +148,12 @@ func (mgr *Manager) newCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) erro
},
}
_, err = controllerutil.CreateOrUpdate(ctx, cli, csiaddonNode, func() error {
if !csiaddonNode.DeletionTimestamp.IsZero() {
return errors.New("csiaddonnode is being deleted")
}

// update the resourceVersion
resourceVersion := csiaddonNode.ResourceVersion
resourceVersion := node.ResourceVersion
if resourceVersion != "" {
csiaddonNode.ResourceVersion = resourceVersion
}
Expand All @@ -148,6 +167,14 @@ func (mgr *Manager) newCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) erro
return fmt.Errorf("failed to create/update csiaddonsnode object: %w", err)
}

watcherMutex.Lock()
if !nodeWatchers[node.Name] {
go mgr.dispatchWatcher(node.Name)

nodeWatchers[node.Name] = true
}
watcherMutex.Unlock()

return nil
}

Expand Down Expand Up @@ -275,3 +302,62 @@ func generateName(nodeID, namespace, ownerKind, ownerName string) (string, error

return base, nil
}

func (mgr *Manager) watchCSIAddonsNode(name string) error {
klog.Infof("Starting watcher for CSIAddonsNode: %s", name)

dynamicClient, err := dynamic.NewForConfig(mgr.Config)
if err != nil {
return fmt.Errorf("failed to create dynamic client: %w", err)
}

gvr := schema.GroupVersionResource{
Group: csiaddonsv1alpha1.GroupVersion.Group,
Version: csiaddonsv1alpha1.GroupVersion.Version,
Resource: "csiaddonsnodes",
}

watcher, err := dynamicClient.Resource(gvr).Namespace(mgr.PodNamespace).Watch(context.Background(), v1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
})
if err != nil {
return fmt.Errorf("failed to watch CSIAddonsNode objects: %w", err)
}
defer watcher.Stop()

for event := range watcher.ResultChan() {
switch event.Type {
case watch.Deleted:
klog.Infof("WARNING: An active CSIAddonsNode: %s was deleted, it will be recreated", name)

err = mgr.Deploy()
if err != nil {
return fmt.Errorf("failed to recreate CSIAddonsNode: %w", err)
}
klog.Infof("CSIAddonsNode: %s recreated", name)
}
}

return nil
}

func (mgr *Manager) dispatchWatcher(name string) {
retryCount := 0

for retryCount < int(watcherRetryCount) {
err := mgr.watchCSIAddonsNode(name)
if err != nil {
klog.Errorf("Watcher for %s exited, retrying (%d/%d), error: %v", name, retryCount+1, watcherRetryCount, err)

retryCount++
time.Sleep(watcherRetryDelay)
} else {
retryCount = 0
}
}
klog.Errorf("watcher for %s reached max retries, giving up", name)

watcherMutex.Lock()
delete(nodeWatchers, name) // Should we os.Exit(1)?
watcherMutex.Unlock()
}

0 comments on commit a1ef307

Please sign in to comment.