Skip to content

Commit

Permalink
csiaddonsNode: Recreate CSIAddonsNode with active sidecar(s)
Browse files Browse the repository at this point in the history
This patch adds a watcher to csi-addons sidecar which is
responsible for re-creating CSIAddonNode(s) in case they
are deleted manually.

Signed-off-by: Niraj Yadav <[email protected]>
  • Loading branch information
black-dragon74 authored and mergify[bot] committed Feb 17, 2025
1 parent 6e0f6e0 commit 4cdbe23
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 14 deletions.
103 changes: 93 additions & 10 deletions sidecar/internal/csiaddonsnode/csiaddonsnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import (

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand All @@ -45,6 +48,11 @@ const (
nodeCreationRetry = time.Minute * 5
// nodeCreationTimeout is the time after which the context for node creation request is cancelled.
nodeCreationTimeout = time.Minute * 3

// watcherRetryCount is the number of times a watcher creation would be retried for.
watcherRetryCount = 10
// watcherRetryDelay is the amount of time to wait before trying recreation of a watcher.
watcherRetryDelay = time.Second * 5
)

var (
Expand Down Expand Up @@ -84,16 +92,10 @@ type Manager struct {
PodUID string
}

// Deploy creates CSIAddonsNode custom resource with all required information.
// When information to create the CSIAddonsNode is missing, an error will be
// returned immediately. If creating the CSIAddonsNode in the Kubernetes
// cluster fails (missing CRD, RBAC limitations, ...), an error will be logged,
// and creation will be retried.
func (mgr *Manager) Deploy() error {
object, err := mgr.getCSIAddonsNode()
if err != nil {
return fmt.Errorf("failed to get csiaddonsNode object: %w", err)
}
// deploy creates CSIAddonsNode custom resource with all required information.
// If creating the CSIAddonsNode in the Kubernetes cluster fails (missing CRD, RBAC limitations, ...)
// an error will be logged and creation will be retried.
func (mgr *Manager) deploy(object *csiaddonsv1alpha1.CSIAddonsNode) error {

// loop until the CSIAddonsNode has been created
return wait.PollUntilContextTimeout(context.TODO(), nodeCreationRetry, nodeCreationTimeout, true, func(ctx context.Context) (bool, error) {
Expand Down Expand Up @@ -133,6 +135,10 @@ func (mgr *Manager) newCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) erro
},
}
_, err = controllerutil.CreateOrUpdate(ctx, cli, csiaddonNode, func() error {
if !csiaddonNode.DeletionTimestamp.IsZero() {
return errors.New("csiaddonnode is being deleted")
}

// update the resourceVersion
resourceVersion := csiaddonNode.ResourceVersion
if resourceVersion != "" {
Expand Down Expand Up @@ -275,3 +281,80 @@ func generateName(nodeID, namespace, ownerKind, ownerName string) (string, error

return base, nil
}

// watchCSIAddonsNode starts a watcher for a specific CSIAddonsNode resource identified by its name.
// If a CSIAddonsNode is deleted, it logs a warning and attempts to recreate it using mgr.deploy()
func (mgr *Manager) watchCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) error {
// Call deploy on start, this takes care of the cases where
// a watcher might exit due to an error while trying to
// recreate CSIAddonsNode in the cluster
err := mgr.deploy(node)
if err != nil {
klog.Fatalf("Failed to create csiaddonsnode: %v", err)
}

klog.Infof("Starting watcher for CSIAddonsNode: %s", node.Name)

dynamicClient, err := dynamic.NewForConfig(mgr.Config)
if err != nil {
return fmt.Errorf("failed to create dynamic client: %w", err)
}

gvr := schema.GroupVersionResource{
Group: csiaddonsv1alpha1.GroupVersion.Group,
Version: csiaddonsv1alpha1.GroupVersion.Version,
Resource: "csiaddonsnodes",
}

watcher, err := dynamicClient.Resource(gvr).Namespace(node.Namespace).Watch(context.Background(), v1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", node.Name),
})
if err != nil {
return fmt.Errorf("failed to watch CSIAddonsNode objects: %w", err)
}
defer watcher.Stop()

for event := range watcher.ResultChan() {
switch event.Type {
case watch.Deleted:
klog.Infof("WARNING: An active CSIAddonsNode: %s was deleted, it will be recreated", node.Name)

err := mgr.deploy(node)
if err != nil {
return fmt.Errorf("failed to recreate CSIAddonsNode: %w", err)
}
klog.Infof("CSIAddonsNode: %s recreated", node.Name)
}
}

// The channel was closed by the API server without any errors
// Simply log it here and return, the dispatcher is responsible
// for restarting the watcher
klog.Infof("Watcher for %s exited gracefully, will be restarted soon", node.Name)

return nil
}

// DispatchWatcher starts a watcher for the CSIAddonsNode and retries
// if the watcher exits due to an error. It will retry up to a maximum number of
// attempts defined by watcherRetryCount before returning an error.
func (mgr *Manager) DispatchWatcher() error {
retryCount := 0
node, err := mgr.getCSIAddonsNode()
if err != nil {
return errors.New("failed to get CSIAddonsNode object")
}

for retryCount < int(watcherRetryCount) {
err := mgr.watchCSIAddonsNode(node)
if err != nil {
klog.Errorf("Watcher for %s exited, retrying (%d/%d), error: %v", node.Name, retryCount+1, watcherRetryCount, err)

retryCount++
time.Sleep(watcherRetryDelay * time.Duration(retryCount))
} else {
retryCount = 0
}
}
return fmt.Errorf("watcher for %s reached max retries, giving up", node.Name)
}
13 changes: 9 additions & 4 deletions sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,15 @@ func main() {
PodNamespace: *podNamespace,
PodUID: *podUID,
}
err = nodeMgr.Deploy()
if err != nil {
klog.Fatalf("Failed to create csiaddonsnode: %v", err)
}

// Start the watcher, it is responsible for fetching
// CSIAddonNode object and then calling deploy()
go func() {
err := nodeMgr.DispatchWatcher()
if err != nil {
klog.Fatalf("failed to start watcher due to error: %v", err)
}
}()

sidecarServer := server.NewSidecarServer(*controllerIP, *controllerPort, kubeClient, *enableAuthChecks)
sidecarServer.RegisterService(service.NewIdentityServer(csiClient.GetGRPCClient()))
Expand Down

0 comments on commit 4cdbe23

Please sign in to comment.