Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csiaddonsNode: Recreate CSIAddonsNode with active sidecar(s) #765

Merged
merged 1 commit into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 93 additions & 10 deletions sidecar/internal/csiaddonsnode/csiaddonsnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import (

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand All @@ -45,6 +48,11 @@ const (
nodeCreationRetry = time.Minute * 5
// nodeCreationTimeout is the time after which the context for node creation request is cancelled.
nodeCreationTimeout = time.Minute * 3

// watcherRetryCount is the number of times a watcher creation would be retried for.
watcherRetryCount = 10
// watcherRetryDelay is the amount of time to wait before trying recreation of a watcher.
watcherRetryDelay = time.Second * 5
)

var (
Expand Down Expand Up @@ -84,16 +92,10 @@ type Manager struct {
PodUID string
}

// Deploy creates CSIAddonsNode custom resource with all required information.
// When information to create the CSIAddonsNode is missing, an error will be
// returned immediately. If creating the CSIAddonsNode in the Kubernetes
// cluster fails (missing CRD, RBAC limitations, ...), an error will be logged,
// and creation will be retried.
func (mgr *Manager) Deploy() error {
object, err := mgr.getCSIAddonsNode()
if err != nil {
return fmt.Errorf("failed to get csiaddonsNode object: %w", err)
}
// deploy creates CSIAddonsNode custom resource with all required information.
// If creating the CSIAddonsNode in the Kubernetes cluster fails (missing CRD, RBAC limitations, ...)
// an error will be logged and creation will be retried.
func (mgr *Manager) deploy(object *csiaddonsv1alpha1.CSIAddonsNode) error {

// loop until the CSIAddonsNode has been created
return wait.PollUntilContextTimeout(context.TODO(), nodeCreationRetry, nodeCreationTimeout, true, func(ctx context.Context) (bool, error) {
Expand Down Expand Up @@ -133,6 +135,10 @@ func (mgr *Manager) newCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) erro
},
}
_, err = controllerutil.CreateOrUpdate(ctx, cli, csiaddonNode, func() error {
if !csiaddonNode.DeletionTimestamp.IsZero() {
return errors.New("csiaddonnode is being deleted")
}

// update the resourceVersion
resourceVersion := csiaddonNode.ResourceVersion
if resourceVersion != "" {
Expand Down Expand Up @@ -275,3 +281,80 @@ func generateName(nodeID, namespace, ownerKind, ownerName string) (string, error

return base, nil
}

// watchCSIAddonsNode starts a watcher for a specific CSIAddonsNode resource identified by its name.
// If a CSIAddonsNode is deleted, it logs a warning and attempts to recreate it using mgr.deploy()
func (mgr *Manager) watchCSIAddonsNode(node *csiaddonsv1alpha1.CSIAddonsNode) error {
// Call deploy on start, this takes care of the cases where
// a watcher might exit due to an error while trying to
// recreate CSIAddonsNode in the cluster
err := mgr.deploy(node)
if err != nil {
klog.Fatalf("Failed to create csiaddonsnode: %v", err)
}

klog.Infof("Starting watcher for CSIAddonsNode: %s", node.Name)

dynamicClient, err := dynamic.NewForConfig(mgr.Config)
if err != nil {
return fmt.Errorf("failed to create dynamic client: %w", err)
}

gvr := schema.GroupVersionResource{
Group: csiaddonsv1alpha1.GroupVersion.Group,
Version: csiaddonsv1alpha1.GroupVersion.Version,
Resource: "csiaddonsnodes",
}

watcher, err := dynamicClient.Resource(gvr).Namespace(node.Namespace).Watch(context.Background(), v1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", node.Name),
})
if err != nil {
return fmt.Errorf("failed to watch CSIAddonsNode objects: %w", err)
}
defer watcher.Stop()

for event := range watcher.ResultChan() {
switch event.Type {
case watch.Deleted:
klog.Infof("WARNING: An active CSIAddonsNode: %s was deleted, it will be recreated", node.Name)

err := mgr.deploy(node)
if err != nil {
return fmt.Errorf("failed to recreate CSIAddonsNode: %w", err)
}
klog.Infof("CSIAddonsNode: %s recreated", node.Name)
}
}

// The channel was closed by the API server without any errors
// Simply log it here and return, the dispatcher is responsible
// for restarting the watcher
klog.Infof("Watcher for %s exited gracefully, will be restarted soon", node.Name)

return nil
}

// DispatchWatcher starts a watcher for the CSIAddonsNode and retries
// if the watcher exits due to an error. It will retry up to a maximum number of
// attempts defined by watcherRetryCount before returning an error.
func (mgr *Manager) DispatchWatcher() error {
retryCount := 0
node, err := mgr.getCSIAddonsNode()
if err != nil {
return errors.New("failed to get CSIAddonsNode object")
}

for retryCount < int(watcherRetryCount) {
err := mgr.watchCSIAddonsNode(node)
if err != nil {
klog.Errorf("Watcher for %s exited, retrying (%d/%d), error: %v", node.Name, retryCount+1, watcherRetryCount, err)

retryCount++
time.Sleep(watcherRetryDelay * time.Duration(retryCount))
} else {
retryCount = 0
}
}
return fmt.Errorf("watcher for %s reached max retries, giving up", node.Name)
}
13 changes: 9 additions & 4 deletions sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,15 @@ func main() {
PodNamespace: *podNamespace,
PodUID: *podUID,
}
err = nodeMgr.Deploy()
if err != nil {
klog.Fatalf("Failed to create csiaddonsnode: %v", err)
}

// Start the watcher, it is responsible for fetching
// CSIAddonNode object and then calling deploy()
go func() {
err := nodeMgr.DispatchWatcher()
if err != nil {
klog.Fatalf("failed to start watcher due to error: %v", err)
}
}()

sidecarServer := server.NewSidecarServer(*controllerIP, *controllerPort, kubeClient, *enableAuthChecks)
sidecarServer.RegisterService(service.NewIdentityServer(csiClient.GetGRPCClient()))
Expand Down
Loading