From 246770eb2c57c006c2d30b8362dda2334672c36a Mon Sep 17 00:00:00 2001 From: Ido Heyvi Date: Wed, 11 Jun 2025 09:33:31 +0300 Subject: [PATCH] feat: Adding support for shared-requestor flow Signed-off-by: Ido Heyvi --- docs/automatic-ofed-upgrade.md | 15 ++ pkg/upgrade/common_manager.go | 53 ++++-- pkg/upgrade/consts.go | 9 +- pkg/upgrade/upgrade_inplace.go | 4 + pkg/upgrade/upgrade_requestor.go | 205 ++++++++++++++++++---- pkg/upgrade/upgrade_state.go | 3 +- pkg/upgrade/upgrade_state_test.go | 282 ++++++++++++++++++++++++++++-- pkg/upgrade/upgrade_suit_test.go | 10 +- pkg/upgrade/util.go | 10 ++ 9 files changed, 518 insertions(+), 73 deletions(-) diff --git a/docs/automatic-ofed-upgrade.md b/docs/automatic-ofed-upgrade.md index ea4911a..d221fa2 100644 --- a/docs/automatic-ofed-upgrade.md +++ b/docs/automatic-ofed-upgrade.md @@ -114,6 +114,21 @@ controller manager watchers: > Meaning in case node undergoes upgrade prior to enabling `requestor` mode, node will continue `inplace` upgrade mode. Only after `requestor` mode is set, and upgrade > controller has set nodes state to be upgrade-required, only then new requestor mode will take place. +###### shared-requestor +The requestor mode supports a `shared-requestor` flow where multiple operators can coordinate node maintenance operations: +Assumptions: +1. Cluster admin, which requires `shared-requestor` flow, needs to make sure that all operators, utilizing maintenance OP, use same upgrade policy specs (same drainSpec). +2. To be able to accommodate both GPU/Network drivers upgrade, `DrainSpec.PodSelector` should be set accordingly (hard-coded). + * podSelector: `nvidia.com/ofed-driver-upgrade-drain.skip!=true,nvidia.com/gpu-driver-upgrade-drain.skip!=true` +3. No custom `NodeMaintenanceNamePrefix` should be used. Requestor will use `DefaultNodeMaintenanceNamePrefix` as a common prefix for nodeMaintenance name. +Flow: +1. Each operator adds its dedicated operator label to the nodeMaintenance object +2. When a nodeMaintenance object exists, additional operators append their requestorID to the spec.AdditionalRequestors list +3. During `uncordon-required` completion: + - Non-owning operators remove themselves from spec.AdditionalRequestors list using optimistic locking + - Each operator removes its dedicated label from the nodeMaintenance object +4. The owning nodeMaintenance operator handles the actual, client side, deletion of the nodeMaintenance object + ### Troubleshooting #### Node is in `upgrade-failed` state * Drain the node manually by running `kubectl drain --ignore-daemonsets` diff --git a/pkg/upgrade/common_manager.go b/pkg/upgrade/common_manager.go index 25e072a..8291719 100644 --- a/pkg/upgrade/common_manager.go +++ b/pkg/upgrade/common_manager.go @@ -94,6 +94,7 @@ type CommonUpgradeManagerImpl struct { ValidationManager ValidationManager SafeDriverLoadManager SafeDriverLoadManager + shouldSkipUpgradeDoneFunc func(requestorID string, nodeState *NodeUpgradeState) (bool, error) // optional states podDeletionStateEnabled bool validationStateEnabled bool @@ -104,6 +105,7 @@ func NewCommonUpgradeStateManager( log logr.Logger, k8sConfig *rest.Config, scheme *runtime.Scheme, + skipFunc func(requestorID string, nodeState *NodeUpgradeState) (bool, error), eventRecorder record.EventRecorder) (*CommonUpgradeManagerImpl, error) { k8sClient, err := client.New(k8sConfig, client.Options{Scheme: scheme}) if err != nil { @@ -117,16 +119,17 @@ func NewCommonUpgradeStateManager( nodeUpgradeStateProvider := NewNodeUpgradeStateProvider(k8sClient, log, eventRecorder) commonUpgrade := CommonUpgradeManagerImpl{ - Log: log, - K8sClient: k8sClient, - K8sInterface: k8sInterface, - EventRecorder: eventRecorder, - DrainManager: NewDrainManager(k8sInterface, nodeUpgradeStateProvider, log, eventRecorder), - PodManager: NewPodManager(k8sInterface, nodeUpgradeStateProvider, log, nil, eventRecorder), - CordonManager: NewCordonManager(k8sInterface, log), - NodeUpgradeStateProvider: nodeUpgradeStateProvider, - ValidationManager: NewValidationManager(k8sInterface, log, eventRecorder, nodeUpgradeStateProvider, ""), - SafeDriverLoadManager: NewSafeDriverLoadManager(nodeUpgradeStateProvider, log), + Log: log, + K8sClient: k8sClient, + K8sInterface: k8sInterface, + EventRecorder: eventRecorder, + DrainManager: NewDrainManager(k8sInterface, nodeUpgradeStateProvider, log, eventRecorder), + PodManager: NewPodManager(k8sInterface, nodeUpgradeStateProvider, log, nil, eventRecorder), + CordonManager: NewCordonManager(k8sInterface, log), + NodeUpgradeStateProvider: nodeUpgradeStateProvider, + ValidationManager: NewValidationManager(k8sInterface, log, eventRecorder, nodeUpgradeStateProvider, ""), + SafeDriverLoadManager: NewSafeDriverLoadManager(nodeUpgradeStateProvider, log), + shouldSkipUpgradeDoneFunc: skipFunc, } return &commonUpgrade, nil @@ -487,7 +490,7 @@ func (m *CommonUpgradeManagerImpl) ProcessPodRestartNodes( } if driverPodInSync { if !m.IsValidationEnabled() { - err = m.updateNodeToUncordonOrDoneState(ctx, nodeState.Node) + err = m.updateNodeToUncordonOrDoneState(ctx, nodeState) if err != nil { return err } @@ -595,7 +598,7 @@ func (m *CommonUpgradeManagerImpl) ProcessValidationRequiredNodes( continue } - err = m.updateNodeToUncordonOrDoneState(ctx, node) + err = m.updateNodeToUncordonOrDoneState(ctx, nodeState) if err != nil { return err } @@ -670,16 +673,30 @@ func (m *CommonUpgradeManagerImpl) SkipNodeUpgrade(node *corev1.Node) bool { // updateNodeToUncordonOrDoneState skips moving the node to the UncordonRequired state if the node // was Unschedulable at the beginning of the upgrade so that the node remains in the same state as // when the upgrade started. In addition, the annotation tracking this information is removed. -func (m *CommonUpgradeManagerImpl) updateNodeToUncordonOrDoneState(ctx context.Context, node *corev1.Node) error { +func (m *CommonUpgradeManagerImpl) updateNodeToUncordonOrDoneState(ctx context.Context, + nodeState *NodeUpgradeState) error { + var ( + shouldSkip bool + err error + ) + node := nodeState.Node newUpgradeState := UpgradeStateUncordonRequired annotationKey := GetUpgradeInitialStateAnnotationKey() - if _, ok := node.Annotations[annotationKey]; ok { - m.Log.V(consts.LogLevelInfo).Info("Node was Unschedulable at beginning of upgrade, skipping uncordon", - "node", node.Name) - newUpgradeState = UpgradeStateDone + if m.shouldSkipUpgradeDoneFunc != nil { + shouldSkip, err = m.shouldSkipUpgradeDoneFunc(GetRequestorOptsFromEnvs().MaintenanceOPRequestorID, nodeState) + if err != nil { + return err + } + } + if !shouldSkip { + if _, ok := node.Annotations[annotationKey]; ok { + m.Log.V(consts.LogLevelInfo).Info("Node was Unschedulable at beginning of upgrade, skipping uncordon", + "node", node.Name) + newUpgradeState = UpgradeStateDone + } } - err := m.NodeUpgradeStateProvider.ChangeNodeUpgradeState(ctx, node, newUpgradeState) + err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeState(ctx, node, newUpgradeState) if err != nil { m.Log.V(consts.LogLevelError).Error( err, "Failed to change node upgrade state", "node", node.Name, "state", newUpgradeState) diff --git a/pkg/upgrade/consts.go b/pkg/upgrade/consts.go index b51a1e1..48555e8 100644 --- a/pkg/upgrade/consts.go +++ b/pkg/upgrade/consts.go @@ -21,6 +21,9 @@ const ( UpgradeStateLabelKeyFmt = "nvidia.com/%s-driver-upgrade-state" // UpgradeSkipNodeLabelKeyFmt is the format of the node label boolean key indicating to skip driver upgrade UpgradeSkipNodeLabelKeyFmt = "nvidia.com/%s-driver-upgrade.skip" + // UpgradeSkipDrainDriverSelectorFmt is the format of the pod selector key indicating to skip driver + // in upgrade drain spec + UpgradeSkipDrainDriverSelectorFmt = "nvidia.com/%s-driver-upgrade-drain.skip" // UpgradeWaitForSafeDriverLoadAnnotationKeyFmt is the format of the node annotation key indicating that // the driver is waiting for safe load. Meaning node should be cordoned and workloads should be removed from the // node before the driver can continue to load. @@ -39,8 +42,12 @@ const ( // (used for orphaned pods) // Setting this label will trigger setting upgrade state to upgrade-required UpgradeRequestedAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-requested" - // UpgradeRequestorModeAnnotationKeyFmt + // UpgradeRequestorModeAnnotationKeyFmt is the format of the node annotation indicating requestor driver upgrade + // mode is used for underlying node UpgradeRequestorModeAnnotationKeyFmt = "nvidia.com/%s-driver-upgrade-requestor-mode" + // UpgradeRequestorLabelKeyFmt is the format of the nodeMaintenance label marked by requestor after modifying + // the nodeMaintenance object + UpgradeRequestorLabelKeyFmt = "nvidia.com/%s-requestor" // UpgradeStateUnknown Node has this state when the upgrade flow is disabled or the node hasn't been processed yet UpgradeStateUnknown = "" // UpgradeStateUpgradeRequired is set when the driver pod on the node is not up-to-date and required upgrade diff --git a/pkg/upgrade/upgrade_inplace.go b/pkg/upgrade/upgrade_inplace.go index efbd4be..94d59d3 100644 --- a/pkg/upgrade/upgrade_inplace.go +++ b/pkg/upgrade/upgrade_inplace.go @@ -130,6 +130,10 @@ func (m *InplaceNodeStateManagerImpl) ProcessUncordonRequiredNodes( if nodeState.NodeMaintenance != nil { continue } + // check if if node upgrade is handled by requestor mode, if so node uncordon will be performed by requestor flow + if _, exists := nodeState.Node.Annotations[GetUpgradeRequestorModeAnnotationKey()]; exists { + continue + } err := m.CordonManager.Uncordon(ctx, nodeState.Node) if err != nil { m.Log.V(consts.LogLevelWarning).Error( diff --git a/pkg/upgrade/upgrade_requestor.go b/pkg/upgrade/upgrade_requestor.go index 06f5173..05a7fba 100644 --- a/pkg/upgrade/upgrade_requestor.go +++ b/pkg/upgrade/upgrade_requestor.go @@ -48,6 +48,10 @@ const ( MaintenanceOPEvictionGPU = "nvidia.com/gpu-*" // MaintenanceOPEvictionRDMA is a default filter for Network OP pods eviction MaintenanceOPEvictionRDMA = "nvidia.com/rdma*" + // DefaultNodeMaintenanceRequestorID is a default requestor ID for NVIDIA operators, in nodeMaintenance object + DefaultNodeMaintenanceRequestorID = "nvidia.operator.com" + // DefaultNodeMaintenanceNamePrefix is a default prefix for nodeMaintenance object name + DefaultNodeMaintenanceNamePrefix = "nvidia-operator" ) var ( @@ -120,8 +124,18 @@ func (p ConditionChangedPredicate) Update(e event.TypedUpdateEvent[client.Object return false } - // check for matching requestor ID - if newO.Spec.RequestorID != p.requestorID { + // check if requestor label exists, if not ignore event + // in case requestor label is removed, ignore event + if newO.Labels == nil { + p.log.Error(nil, "NodeMaintenance object has no labels, ignoring event", + "requestorID", p.requestorID, "objectName", newO.Name, "objectNamespace", newO.Namespace) + return false + } + // check if requestor label exists, if not ignore event + _, ok = newO.Labels[GetUpgradeRequestorLabelKey(p.requestorID)] + if !ok { + p.log.V(consts.LogLevelDebug).Info("NodeMaintenance object does not belong to this requestor, ignoring event", + "requestorID", p.requestorID, "objectName", newO.Name, "objectNamespace", newO.Namespace) return false } @@ -165,13 +179,16 @@ func SetDefaultNodeMaintenance(opts RequestorOptions, func (m *RequestorNodeStateManagerImpl) NewNodeMaintenance(nodeName string) *maintenancev1alpha1.NodeMaintenance { nm := defaultNodeMaintenance.DeepCopy() nm.Name = m.getNodeMaintenanceName(nodeName) + // mark nodeMaintenance object as updated by the requestor + nm.Labels = make(map[string]string) + nm.Labels[GetUpgradeRequestorLabelKey(m.opts.MaintenanceOPRequestorID)] = trueString nm.Spec.NodeName = nodeName return nm } -// CreateNodeMaintenance creates nodeMaintenance obj for designated node upgrade-required state -func (m *RequestorNodeStateManagerImpl) CreateNodeMaintenance(ctx context.Context, +// createNodeMaintenance creates nodeMaintenance obj for designated node upgrade-required state +func (m *RequestorNodeStateManagerImpl) createNodeMaintenance(ctx context.Context, nodeState *NodeUpgradeState) error { nm := m.NewNodeMaintenance(nodeState.Node.Name) nodeState.NodeMaintenance = nm @@ -206,8 +223,8 @@ func (m *RequestorNodeStateManagerImpl) GetNodeMaintenanceObj(ctx context.Contex return nm, nil } -// DeleteNodeMaintenance requests to delete nodeMaintenance obj -func (m *RequestorNodeStateManagerImpl) DeleteNodeMaintenance(ctx context.Context, +// deleteNodeMaintenance requests to delete nodeMaintenance obj +func (m *RequestorNodeStateManagerImpl) deleteNodeMaintenance(ctx context.Context, nodeState *NodeUpgradeState) error { _, err := validateNodeMaintenance(nodeState) if err != nil { @@ -271,7 +288,7 @@ func (m *RequestorNodeStateManagerImpl) ProcessUpgradeRequiredNodes( SetDefaultNodeMaintenance(m.opts, upgradePolicy) for _, nodeState := range currentClusterState.NodeStates[UpgradeStateUpgradeRequired] { if m.IsUpgradeRequested(nodeState.Node) { - // Make sure to remove the upgrade-requested annotation + // make sure to remove the upgrade-requested annotation err := m.NodeUpgradeStateProvider.ChangeNodeUpgradeAnnotation(ctx, nodeState.Node, GetUpgradeRequestedAnnotationKey(), "null") if err != nil { @@ -285,14 +302,14 @@ func (m *RequestorNodeStateManagerImpl) ProcessUpgradeRequiredNodes( continue } - err := m.CreateNodeMaintenance(ctx, nodeState) + err := m.createOrUpdateNodeMaintenance(ctx, nodeState) if err != nil { - m.Log.V(consts.LogLevelError).Error(err, "failed to create nodeMaintenance") + m.Log.V(consts.LogLevelError).Error(err, "failed to create or update nodeMaintenance") return err } annotationKey := GetUpgradeRequestorModeAnnotationKey() - err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeAnnotation(ctx, nodeState.Node, annotationKey, "true") + err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeAnnotation(ctx, nodeState.Node, annotationKey, trueString) if err != nil { return fmt.Errorf("failed annotate node for 'upgrade-requestor-mode'. %v", err) } @@ -306,6 +323,103 @@ func (m *RequestorNodeStateManagerImpl) ProcessUpgradeRequiredNodes( return nil } +func (m *RequestorNodeStateManagerImpl) createOrUpdateNodeMaintenance(ctx context.Context, + nodeState *NodeUpgradeState) error { + // check for existing nodeMaintenance obj and if default prefix is used + if nodeState.NodeMaintenance != nil && m.opts.NodeMaintenanceNamePrefix == DefaultNodeMaintenanceNamePrefix { + // if exists append requestorID to spec.AdditionalRequestors list + nm, ok := nodeState.NodeMaintenance.(*maintenancev1alpha1.NodeMaintenance) + if !ok { + return fmt.Errorf("failed to cast object to NodeMaintenance. %v", nm) + } + // check if object is owned by the requestor, if so skip re-creation + if nm.Spec.RequestorID == m.opts.MaintenanceOPRequestorID { + m.Log.V(consts.LogLevelInfo).Info("nodeMaintenance already exists", nm.Name, "skip creation") + return nil + } + + // check if requestor is already in AdditionalRequestors + if slices.Contains(nm.Spec.AdditionalRequestors, m.opts.MaintenanceOPRequestorID) { + m.Log.V(consts.LogLevelInfo).Info("requestor already in AdditionalRequestors list", + "requestorID", m.opts.MaintenanceOPRequestorID) + return nil + } + + m.Log.V(consts.LogLevelInfo).Info("appending new requestor", nm.Spec.RequestorID, "under 'AdditionalRequestors' list") + // create a deep copy of the original object before modifying it + originalNm := nm.DeepCopy() + // update AdditionalRequestor list + nm.Spec.AdditionalRequestors = append(nm.Spec.AdditionalRequestors, m.opts.MaintenanceOPRequestorID) + if nm.Labels == nil { + nm.Labels = make(map[string]string) + } + // only set label if it doesn't exist + if _, exists := nm.Labels[GetUpgradeRequestorLabelKey(m.opts.MaintenanceOPRequestorID)]; !exists { + nm.Labels[GetUpgradeRequestorLabelKey(m.opts.MaintenanceOPRequestorID)] = trueString + } + // using optimistic lock and patch command to avoid updating entire object and refraining of additionalRequestors list + // overwrite by other operators + patch := client.MergeFromWithOptions(originalNm, client.MergeFromWithOptimisticLock{}) + err := m.K8sClient.Patch(ctx, nm, patch) + if err != nil { + m.Log.V(consts.LogLevelError).Error(err, "failed to update nodeMaintenance") + return err + } + } else { + err := m.createNodeMaintenance(ctx, nodeState) + if err != nil { + m.Log.V(consts.LogLevelError).Error(err, "failed to create nodeMaintenance") + return err + } + } + + return nil +} + +func (m *RequestorNodeStateManagerImpl) deleteOrUpdateNodeMaintenance(ctx context.Context, + nodeState *NodeUpgradeState) error { + // check for existing nodeMaintenance obj + if nodeState.NodeMaintenance != nil { + nm, ok := nodeState.NodeMaintenance.(*maintenancev1alpha1.NodeMaintenance) + if !ok { + return fmt.Errorf("failed to cast object to NodeMaintenance. %v", nodeState.NodeMaintenance) + } + // check if object is owned by deleting requestor, if so proceed to deletion + if nm.Spec.RequestorID == m.opts.MaintenanceOPRequestorID { + m.Log.V(consts.LogLevelInfo).Info("deleting node maintenance", + nodeState.NodeMaintenance.GetName(), nodeState.NodeMaintenance.GetNamespace()) + err := m.deleteNodeMaintenance(ctx, nodeState) + if err != nil { + m.Log.V(consts.LogLevelWarning).Error( + err, "Node uncordon failed", "node", nodeState.Node) + return err + } + } else { + m.Log.V(consts.LogLevelInfo).Info("removing requestor from node maintenance additional requestors list", + nodeState.NodeMaintenance.GetName(), nodeState.NodeMaintenance.GetNamespace()) + // remove requestorID from spec.AdditionalRequestors list and patch the object + // check if requestorID is under additional requestors list + if slices.Contains(nm.Spec.AdditionalRequestors, m.opts.MaintenanceOPRequestorID) { + originalNm := nm.DeepCopy() + nm.Spec.AdditionalRequestors = slices.DeleteFunc(nm.Spec.AdditionalRequestors, func(id string) bool { + return id == m.opts.MaintenanceOPRequestorID + }) + // remove requestorID label from the object + // only remove label if it exists + if nm.Labels != nil { + delete(nm.Labels, GetUpgradeRequestorLabelKey(m.opts.MaintenanceOPRequestorID)) + } + patch := client.MergeFromWithOptions(originalNm, client.MergeFromWithOptimisticLock{}) + err := m.K8sClient.Patch(ctx, nm, patch) + if err != nil { + return fmt.Errorf("failed to update nodeMaintenance. %v", err) + } + } + } + } + + return nil +} // ProcessNodeMaintenanceRequiredNodes processes UpgradeStatePostMaintenanceRequired // by adding UpgradeStatePodRestartRequired under existing UpgradeStatePodRestartRequired nodes list. @@ -354,36 +468,29 @@ func (m *RequestorNodeStateManagerImpl) ProcessUncordonRequiredNodes( m.Log.V(consts.LogLevelInfo).Info("ProcessUncordonRequiredNodes") for _, nodeState := range currentClusterState.NodeStates[UpgradeStateUncordonRequired] { - m.Log.V(consts.LogLevelDebug).Info("deleting node maintenance", - nodeState.NodeMaintenance.GetName(), nodeState.NodeMaintenance.GetNamespace()) - // skip in case node undergoes uncordon by inplace flow + // skip in case node undergoes uncordon by in-place flow if nodeState.NodeMaintenance == nil { + // only when nodeMaintenance obj is deleted, node state should be updated to 'upgrade-done' + err := m.NodeUpgradeStateProvider.ChangeNodeUpgradeState(ctx, nodeState.Node, UpgradeStateDone) + if err != nil { + m.Log.V(consts.LogLevelError).Error( + err, "Failed to change node upgrade state", "state", UpgradeStateDone) + return err + } + // remove requestor upgrade annotation + err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeAnnotation(ctx, + nodeState.Node, GetUpgradeRequestorModeAnnotationKey(), "null") + if err != nil { + return fmt.Errorf("failed to remove '%s' annotation . %v", GetUpgradeRequestorModeAnnotationKey(), err) + } return nil } - err := m.DeleteNodeMaintenance(ctx, nodeState) + err := m.deleteOrUpdateNodeMaintenance(ctx, nodeState) if err != nil { m.Log.V(consts.LogLevelWarning).Error( err, "Node uncordon failed", "node", nodeState.Node) return err } - // this means that node maintenance obj has been deleted - err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeState(ctx, nodeState.Node, - UpgradeStateDone) - if err != nil { - return fmt.Errorf("failed to update node state. %v", err) - } - // remove requestor upgrade annotation - err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeAnnotation(ctx, - nodeState.Node, GetUpgradeRequestorModeAnnotationKey(), "null") - if err != nil { - return fmt.Errorf("failed to remove '%s' annotation . %v", GetUpgradeRequestorModeAnnotationKey(), err) - } - err = m.NodeUpgradeStateProvider.ChangeNodeUpgradeState(ctx, nodeState.Node, UpgradeStateDone) - if err != nil { - m.Log.V(consts.LogLevelError).Error( - err, "Failed to change node upgrade state", "state", UpgradeStateDone) - return err - } } return nil } @@ -424,10 +531,38 @@ func convertV1Alpha1ToMaintenance(upgradePolicy *v1alpha1.DriverUpgradePolicySpe return drainSpec, podComplition } +func setShouldSkipUpgradeDoneFunc(opts RequestorOptions) func(requestorID string, + nodeState *NodeUpgradeState) (bool, error) { + var shouldSkipFunc func(requestorID string, nodeState *NodeUpgradeState) (bool, error) + if opts.UseMaintenanceOperator { + shouldSkipFunc = shouldSkipUpgradeDone + } else { + shouldSkipFunc = nil + } + + return shouldSkipFunc +} + +// shouldSkipUpgradeDone skips upgrade-done state if other requestor +// has already set node to be uncordon +func shouldSkipUpgradeDone(requestorID string, nodeState *NodeUpgradeState) (bool, error) { + if nodeState.NodeMaintenance != nil { + nm, ok := nodeState.NodeMaintenance.(*maintenancev1alpha1.NodeMaintenance) + if !ok { + return false, fmt.Errorf("failed to cast object to NodeMaintenance. %v", nodeState.NodeMaintenance) + } + // check if nodeMaintenance is owned by the requestor + if nm.Spec.RequestorID != requestorID { + return true, nil + } + } + return false, nil +} + // GetRequestorEnvs returns requstor upgrade related options according to provided environment variables func GetRequestorOptsFromEnvs() RequestorOptions { opts := RequestorOptions{} - if os.Getenv("MAINTENANCE_OPERATOR_ENABLED") == "true" { + if os.Getenv("MAINTENANCE_OPERATOR_ENABLED") == trueString { opts.UseMaintenanceOperator = true } if os.Getenv("MAINTENANCE_OPERATOR_REQUESTOR_NAMESPACE") != "" { @@ -438,12 +573,12 @@ func GetRequestorOptsFromEnvs() RequestorOptions { if os.Getenv("MAINTENANCE_OPERATOR_REQUESTOR_ID") != "" { opts.MaintenanceOPRequestorID = os.Getenv("MAINTENANCE_OPERATOR_REQUESTOR_ID") } else { - opts.MaintenanceOPRequestorID = "nvidia.operator.com" + opts.MaintenanceOPRequestorID = DefaultNodeMaintenanceRequestorID } if os.Getenv("MAINTENANCE_OPERATOR_NODE_MAINTENANCE_PREFIX") != "" { opts.NodeMaintenanceNamePrefix = os.Getenv("MAINTENANCE_OPERATOR_NODE_MAINTENANCE_PREFIX") } else { - opts.NodeMaintenanceNamePrefix = "nvidia-operator" + opts.NodeMaintenanceNamePrefix = DefaultNodeMaintenanceNamePrefix } return opts } diff --git a/pkg/upgrade/upgrade_state.go b/pkg/upgrade/upgrade_state.go index 4c1d626..5110b60 100644 --- a/pkg/upgrade/upgrade_state.go +++ b/pkg/upgrade/upgrade_state.go @@ -67,7 +67,8 @@ func NewClusterUpgradeStateManager( k8sConfig *rest.Config, eventRecorder record.EventRecorder, opts StateOptions) (ClusterUpgradeStateManager, error) { - commonmanager, err := NewCommonUpgradeStateManager(log, k8sConfig, Scheme, eventRecorder) + shouldSkipFunc := setShouldSkipUpgradeDoneFunc(opts.Requestor) + commonmanager, err := NewCommonUpgradeStateManager(log, k8sConfig, Scheme, shouldSkipFunc, eventRecorder) if err != nil { return nil, fmt.Errorf("failed to create commonmanager upgrade state manager. %v", err) } diff --git a/pkg/upgrade/upgrade_state_test.go b/pkg/upgrade/upgrade_state_test.go index da54cef..abc341e 100644 --- a/pkg/upgrade/upgrade_state_test.go +++ b/pkg/upgrade/upgrade_state_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "os" + "strings" "time" . "github.com/onsi/ginkgo/v2" @@ -65,6 +66,8 @@ var _ = Describe("UpgradeStateManager tests", func() { stateManager.CordonManager = &cordonManager stateManager.PodManager = &podManager stateManager.ValidationManager = &validationManager + + testRequestorID = PrimaryTestRequestorID }) AfterEach(func() { @@ -100,6 +103,7 @@ var _ = Describe("UpgradeStateManager tests", func() { if len(nms.Items) == 0 { return true } + Expect(err).NotTo(HaveOccurred()) for _, item := range nms.Items { err = removeFinalizersOrDelete(testCtx, &item) Expect(err).NotTo(HaveOccurred()) @@ -1291,14 +1295,15 @@ var _ = Describe("UpgradeStateManager tests", func() { It("UpgradeStateManager should move to 'node-maintenance-required' while using upgrade requestor mode", func() { namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name - cancel := withUpgradeRequestorMode(testCtx, namespace) + cancel := withUpgradeRequestorMode(testCtx, namespace, true) defer cancel() clusterState := withClusterUpgradeState(3, upgrade.UpgradeStateUpgradeRequired, namespace, nil, false) policy := &v1alpha1.DriverUpgradePolicySpec{ AutoUpgrade: true, DrainSpec: &v1alpha1.DrainSpec{ - Enable: true, + Enable: true, + PodSelector: "nvidia.com/gpu-driver-upgrade-drain.skip!=true,nvidia.com/ofed-driver-upgrade-drain.skip!=true", }, } Expect(stateManagerInterface.ApplyState(testCtx, &clusterState, policy)).To(Succeed()) @@ -1335,6 +1340,7 @@ var _ = Describe("UpgradeStateManager tests", func() { // validate requestor's opts Expect(nm.Spec.RequestorID).To(Equal(opts.Requestor.MaintenanceOPRequestorID)) Expect(nm.Name).To(Equal(opts.Requestor.NodeMaintenanceNamePrefix + "-" + nm.Spec.NodeName)) + Expect(nm.Labels).To(HaveKey(upgrade.GetUpgradeRequestorLabelKey(opts.Requestor.MaintenanceOPRequestorID))) nm.Finalizers = append(nm.Finalizers, maintenancev1alpha1.MaintenanceFinalizerName) err = k8sClient.Update(testCtx, nm) Expect(err).NotTo(HaveOccurred()) @@ -1348,6 +1354,10 @@ var _ = Describe("UpgradeStateManager tests", func() { if len(nm.Finalizers) == 0 { return fmt.Errorf("missing status condition") } + if !strings.Contains(item.Spec.DrainSpec.PodSelector, + "nvidia.com/gpu-driver-upgrade-drain.skip!=true,nvidia.com/ofed-driver-upgrade-drain.skip!=true") { + return fmt.Errorf("missing pod selector. '%s'", item.Spec.DrainSpec.PodSelector) + } return nil }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) } @@ -1365,9 +1375,128 @@ var _ = Describe("UpgradeStateManager tests", func() { }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) }) + It("UpgradeStateManager should move to 'node-maintenance-required' while using upgrade requestor mode"+ + "and shared-requestor flow", func() { + + namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name + cancel := withUpgradeRequestorMode(testCtx, namespace, false) + defer cancel() + + testRequestorID = SecondaryTestRequestorID + clusterState := withClusterUpgradeState(3, upgrade.UpgradeStateUpgradeRequired, namespace, nil, true) + policy := &v1alpha1.DriverUpgradePolicySpec{ + AutoUpgrade: true, + DrainSpec: &v1alpha1.DrainSpec{ + Enable: true, + }, + } + + By("verify generated node-maintenance obj(s)") + nms := &maintenancev1alpha1.NodeMaintenanceList{} + Eventually(func() bool { + k8sClient.List(testCtx, nms) + return len(nms.Items) == len(clusterState.NodeStates[upgrade.UpgradeStateUpgradeRequired]) + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + + Expect(stateManagerInterface.ApplyState(testCtx, &clusterState, policy)).To(Succeed()) + + By("verify node requestor-mode-annotation") + Eventually(func() bool { + for _, nodeState := range clusterState.NodeStates[upgrade.UpgradeStateUpgradeRequired] { + node := corev1.Node{} + nodeKey := client.ObjectKey{ + Name: nodeState.Node.Name, + } + if err := k8sClient.Get(testCtx, nodeKey, &node); err != nil { + if _, ok := node.Annotations[upgrade.GetUpgradeRequestorModeAnnotationKey()]; !ok { + return false + } + } + Expect(node.Annotations[upgrade.GetUpgradeRequestorModeAnnotationKey()]).To(Equal("true")) + } + return true + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + + By("verify modified node maintenance obj(s)") + Eventually(func() bool { + nms := &maintenancev1alpha1.NodeMaintenanceList{} + err := k8sClient.List(testCtx, nms) + if err != nil { + return false + } + + for _, item := range nms.Items { + if len(item.Spec.AdditionalRequestors) == 0 { + return false + } + Expect(item.Spec.RequestorID).To(Equal(SecondaryTestRequestorID)) + Expect(item.Spec.AdditionalRequestors).To(ContainElement(PrimaryTestRequestorID)) + Expect(item.Labels).To(HaveKey(upgrade.GetUpgradeRequestorLabelKey(PrimaryTestRequestorID))) + } + return true + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + }) + It("UpgradeStateManager should stay on 'node-maintenance-required' while using upgrade requestor mode."+ + "owning node-maintenance obj while new shared-requestor added", func() { + namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name + cancel := withUpgradeRequestorMode(testCtx, namespace, false) + defer cancel() + + clusterState := withClusterUpgradeState(3, upgrade.UpgradeStateNodeMaintenanceRequired, namespace, nil, true) + policy := &v1alpha1.DriverUpgradePolicySpec{ + AutoUpgrade: true, + DrainSpec: &v1alpha1.DrainSpec{ + Enable: true, + }, + } + + nms := &maintenancev1alpha1.NodeMaintenanceList{} + err := k8sClient.List(testCtx, nms) + Expect(err).NotTo(HaveOccurred()) + for _, item := range nms.Items { + if item.Spec.AdditionalRequestors == nil { + item.Spec.AdditionalRequestors = make([]string, 0) + } + originalNm := item.DeepCopy() + item.Spec.AdditionalRequestors = append(item.Spec.AdditionalRequestors, SecondaryTestRequestorID) + err := k8sClient.Patch(testCtx, &item, client.MergeFrom(originalNm)) + Expect(err).NotTo(HaveOccurred()) + } + + By("verify modified node maintenance obj(s)") + Eventually(func() bool { + nms := &maintenancev1alpha1.NodeMaintenanceList{} + err := k8sClient.List(testCtx, nms) + if err != nil { + return false + } + + for _, item := range nms.Items { + if len(item.Spec.AdditionalRequestors) == 0 { + return false + } + } + return true + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + Expect(stateManagerInterface.ApplyState(testCtx, &clusterState, policy)).To(Succeed()) + + node := &corev1.Node{} + Eventually(func() error { + err := k8sClient.Get(testCtx, client.ObjectKey{Name: "node-1", Namespace: namespace}, node) + if err != nil { + return err + } + if getNodeUpgradeState(node) != upgrade.UpgradeStateNodeMaintenanceRequired { + return fmt.Errorf("missing status condition") + } + return nil + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) + + }) + It("UpgradeStateManager should move to 'post-maintenance-required' while using upgrade requestor mode", func() { namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name - cancel := withUpgradeRequestorMode(testCtx, namespace) + cancel := withUpgradeRequestorMode(testCtx, namespace, true) defer cancel() clusterState := withClusterUpgradeState(1, upgrade.UpgradeStateNodeMaintenanceRequired, namespace, @@ -1390,7 +1519,8 @@ var _ = Describe("UpgradeStateManager tests", func() { }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) nmObj := &maintenancev1alpha1.NodeMaintenance{} - err := k8sClient.Get(testCtx, client.ObjectKey{Name: "node-1", Namespace: namespace}, nmObj) + err := k8sClient.Get(testCtx, client.ObjectKey{Name: opts.Requestor.NodeMaintenanceNamePrefix + "-node-1", + Namespace: namespace}, nmObj) Expect(err).NotTo(HaveOccurred()) By("set node-maintenance(s) status to mimic maintenance-operator 'Ready' condition flow") status := maintenancev1alpha1.NodeMaintenanceStatus{ @@ -1408,7 +1538,8 @@ var _ = Describe("UpgradeStateManager tests", func() { nm := &maintenancev1alpha1.NodeMaintenance{} Eventually(func() error { - err := k8sClient.Get(testCtx, client.ObjectKey{Name: "node-1", Namespace: namespace}, nm) + err := k8sClient.Get(testCtx, client.ObjectKey{Name: opts.Requestor.NodeMaintenanceNamePrefix + "-" + "node-1", + Namespace: namespace}, nm) if err != nil { return err } @@ -1439,7 +1570,7 @@ var _ = Describe("UpgradeStateManager tests", func() { It("UpgradeStateManager should move node to 'upgrade-required' in case nodeMaintenance obj is missing "+ "while using upgrade requestor mode", func() { namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name - cancel := withUpgradeRequestorMode(testCtx, namespace) + cancel := withUpgradeRequestorMode(testCtx, namespace, true) defer cancel() clusterState := withClusterUpgradeState(1, upgrade.UpgradeStateNodeMaintenanceRequired, namespace, @@ -1482,7 +1613,7 @@ var _ = Describe("UpgradeStateManager tests", func() { It("UpgradeStateManager continue inplace upgrade logic, move to 'wait-for-jobs-required' "+ "while using upgrade requestor mode", func() { namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name - cancel := withUpgradeRequestorMode(testCtx, namespace) + cancel := withUpgradeRequestorMode(testCtx, namespace, true) defer cancel() clusterState := withClusterUpgradeState(3, upgrade.UpgradeStateCordonRequired, namespace, nil, true) @@ -1511,7 +1642,7 @@ var _ = Describe("UpgradeStateManager tests", func() { It("UpgradeStateManager move to 'upgrade-done' using upgrade requestor mode", func() { namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name - cancel := withUpgradeRequestorMode(testCtx, namespace) + cancel := withUpgradeRequestorMode(testCtx, namespace, true) defer cancel() clusterState := withClusterUpgradeState(3, upgrade.UpgradeStateUncordonRequired, namespace, @@ -1523,7 +1654,7 @@ var _ = Describe("UpgradeStateManager tests", func() { }, } - By("verify node-maintenance obj(s) have been deleted") + By("verify node-maintenance obj(s) exists") nms := &maintenancev1alpha1.NodeMaintenanceList{} Eventually(func() bool { k8sClient.List(testCtx, nms) @@ -1531,6 +1662,16 @@ var _ = Describe("UpgradeStateManager tests", func() { }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) Expect(stateManagerInterface.ApplyState(testCtx, &clusterState, policy)).To(Succeed()) + By("verify node-maintenance obj(s) have been deleted") + nms = &maintenancev1alpha1.NodeMaintenanceList{} + Eventually(func() bool { + k8sClient.List(testCtx, nms) + return len(nms.Items) == 0 + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + // update new cluster state with deleted node-maintenance obj(s) + newClusterState, err := stateManagerInterface.BuildState(testCtx, namespace, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(stateManagerInterface.ApplyState(testCtx, newClusterState, policy)).To(Succeed()) By("verify node is in 'upgrade-done' state") node := &corev1.Node{} Eventually(func() error { @@ -1543,13 +1684,116 @@ var _ = Describe("UpgradeStateManager tests", func() { } return nil }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) + }) + + It("UpgradeStateManager should move to 'uncordon-required' while using upgrade requestor mode."+ + "not-owning node-maintenance obj while new shared-requestor added", func() { + namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name + cancel := withUpgradeRequestorMode(testCtx, namespace, false) + defer cancel() + + testRequestorID = SecondaryTestRequestorID + clusterState := withClusterUpgradeState(3, upgrade.UpgradeStateValidationRequired, namespace, + map[string]string{upgrade.GetUpgradeInitialStateAnnotationKey(): "true"}, true) + policy := &v1alpha1.DriverUpgradePolicySpec{ + AutoUpgrade: true, + DrainSpec: &v1alpha1.DrainSpec{ + Enable: true, + }, + } + + nms := &maintenancev1alpha1.NodeMaintenanceList{} + Eventually(func() bool { + k8sClient.List(testCtx, nms) + return len(nms.Items) == len(clusterState.NodeStates[upgrade.UpgradeStateValidationRequired]) + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + + By("update node-maintenance obj(s) additional requestors list") + for _, item := range nms.Items { + if item.Spec.AdditionalRequestors == nil { + item.Spec.AdditionalRequestors = make([]string, 0) + } + originalNm := item.DeepCopy() + item.Spec.AdditionalRequestors = append(item.Spec.AdditionalRequestors, PrimaryTestRequestorID) + patch := client.MergeFromWithOptions(originalNm, client.MergeFromWithOptimisticLock{}) + err := k8sClient.Patch(testCtx, &item, patch) + Expect(err).NotTo(HaveOccurred()) + } + Expect(stateManagerInterface.ApplyState(testCtx, &clusterState, policy)).To(Succeed()) + + node := &corev1.Node{} + Eventually(func() error { + err := k8sClient.Get(testCtx, client.ObjectKey{Name: "node-1", Namespace: namespace}, node) + if err != nil { + return err + } + if getNodeUpgradeState(node) != upgrade.UpgradeStateUncordonRequired { + return fmt.Errorf("missing status condition") + } + return nil + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) + }) + + It("UpgradeStateManager should stay on 'uncordon-required' while using upgrade requestor mode"+ + "and shared-requestor flow", func() { + // Verify that joined requestor has been removed itself from Spec.AdditionalRequestors list + namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name + cancel := withUpgradeRequestorMode(testCtx, namespace, false) + defer cancel() + + clusterState := withClusterUpgradeState(3, upgrade.UpgradeStateUncordonRequired, namespace, + map[string]string{upgrade.GetUpgradeRequestedAnnotationKey(): "true"}, true) + policy := &v1alpha1.DriverUpgradePolicySpec{ + AutoUpgrade: true, + DrainSpec: &v1alpha1.DrainSpec{ + Enable: true, + }, + } + + By("verify node-maintenance obj(s) exists") + nms := &maintenancev1alpha1.NodeMaintenanceList{} + Eventually(func() bool { + k8sClient.List(testCtx, nms) + return len(nms.Items) == len(clusterState.NodeStates[upgrade.UpgradeStateUncordonRequired]) + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) + + By("update node-maintenance obj(s) additional requestors list") + for _, item := range nms.Items { + if item.Spec.AdditionalRequestors == nil { + item.Spec.AdditionalRequestors = make([]string, 0) + } + originalNm := item.DeepCopy() + item.Spec.AdditionalRequestors = append(item.Spec.AdditionalRequestors, SecondaryTestRequestorID) + err := k8sClient.Patch(testCtx, &item, client.MergeFrom(originalNm)) + Expect(err).NotTo(HaveOccurred()) + } + + Expect(stateManagerInterface.ApplyState(testCtx, &clusterState, policy)).To(Succeed()) + By("verify nodes stays in 'uncordon-required' state") + Eventually(func() error { + nodes := &corev1.NodeList{} + err := k8sClient.List(testCtx, nodes) + Expect(err).NotTo(HaveOccurred()) + for _, node := range nodes.Items { + if getNodeUpgradeState(&node) != upgrade.UpgradeStateUncordonRequired { + return fmt.Errorf("missing status condition") + } + } + return nil + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(Succeed()) + By("verify node-maintenance obj(s) have been deleted") + nms = &maintenancev1alpha1.NodeMaintenanceList{} + Eventually(func() bool { + k8sClient.List(testCtx, nms) + return len(nms.Items) == 0 + }).WithTimeout(10 * time.Second).WithPolling(1 * 500 * time.Millisecond).Should(BeTrue()) }) It("UpgradeStateManager should move outdated node to UpgradeRequired states with orphaned pod if 'upgrade-requested' "+ "while using upgrade requestor mode", func() { namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name - cancel := withUpgradeRequestorMode(testCtx, namespace) + cancel := withUpgradeRequestorMode(testCtx, namespace, true) defer cancel() orphanedPod := &corev1.Pod{} @@ -1576,7 +1820,7 @@ var _ = Describe("UpgradeStateManager tests", func() { It("UpgradeStateManager should move up-to-date nodes with safe driver loading annotation "+ "to UpgradeRequired state, while using upgrade requestor mode", func() { namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name - cancel := withUpgradeRequestorMode(testCtx, namespace) + cancel := withUpgradeRequestorMode(testCtx, namespace, true) defer cancel() safeLoadAnnotationKey := upgrade.GetUpgradeDriverWaitForSafeLoadAnnotationKey() @@ -1602,7 +1846,7 @@ var _ = Describe("UpgradeStateManager tests", func() { "if it's in ValidationRequired, validation has completed, and node was initially Unschedulable "+ "while using upgrade requestor mode", func() { namespace := createNamespace(fmt.Sprintf("namespace-%s", id)).Name - cancel := withUpgradeRequestorMode(testCtx, namespace) + cancel := withUpgradeRequestorMode(testCtx, namespace, true) defer cancel() node := NewNode(fmt.Sprintf("node1-%s", id)). @@ -1693,12 +1937,18 @@ func withClusterUpgradeState(nodeCount int, nodeState string, namespace string, } -func withUpgradeRequestorMode(testCtx context.Context, namespace string) context.CancelFunc { - var err error +func withUpgradeRequestorMode(testCtx context.Context, namespace string, useCustomPrefix bool) context.CancelFunc { + var ( + err error + prefix string + ) + if useCustomPrefix { + prefix = "test" + } os.Setenv("MAINTENANCE_OPERATOR_ENABLED", "true") os.Setenv("MAINTENANCE_OPERATOR_REQUESTOR_NAMESPACE", namespace) - os.Setenv("MAINTENANCE_OPERATOR_REQUESTOR_ID", "network.opeator.com") - os.Setenv("MAINTENANCE_OPERATOR_NODE_MAINTENANCE_PREFIX", "test") + os.Setenv("MAINTENANCE_OPERATOR_REQUESTOR_ID", testRequestorID) + os.Setenv("MAINTENANCE_OPERATOR_NODE_MAINTENANCE_PREFIX", prefix) _, cancelFn := context.WithCancel(testCtx) opts.Requestor = upgrade.GetRequestorOptsFromEnvs() diff --git a/pkg/upgrade/upgrade_suit_test.go b/pkg/upgrade/upgrade_suit_test.go index ddbeee7..27e6528 100644 --- a/pkg/upgrade/upgrade_suit_test.go +++ b/pkg/upgrade/upgrade_suit_test.go @@ -48,6 +48,11 @@ import ( // +kubebuilder:scaffold:imports ) +const ( + PrimaryTestRequestorID = "nvidia.operator.com" + SecondaryTestRequestorID = "foo.bar.com" +) + // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. var ( @@ -64,6 +69,7 @@ var ( eventRecorder = record.NewFakeRecorder(100) createdObjects []client.Object testCtx context.Context + testRequestorID string ) func TestAPIs(t *testing.T) { @@ -261,12 +267,12 @@ type NodeMaintenance struct { func NewNodeMaintenance(name, namespace string) NodeMaintenance { nm := &maintenancev1alpha1.NodeMaintenance{ ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: opts.Requestor.NodeMaintenanceNamePrefix + "-" + name, Namespace: namespace, }, Spec: maintenancev1alpha1.NodeMaintenanceSpec{ NodeName: name, - RequestorID: "dummy-requestor.com", + RequestorID: testRequestorID, }, } diff --git a/pkg/upgrade/util.go b/pkg/upgrade/util.go index 735e549..fc57bb2 100644 --- a/pkg/upgrade/util.go +++ b/pkg/upgrade/util.go @@ -97,6 +97,11 @@ func SetDriverName(driver string) { DriverName = driver } +// GetUpgradeSkipDrainDriverPodSelector returns pod selector to skip drain for a given driver +func GetUpgradeSkipDrainDriverPodSelector(driverName string) string { + return fmt.Sprintf(UpgradeSkipDrainDriverSelectorFmt+"!=true", driverName) +} + // GetUpgradeStateLabelKey returns state label key used for upgrades func GetUpgradeStateLabelKey() string { return fmt.Sprintf(UpgradeStateLabelKeyFmt, DriverName) @@ -107,6 +112,11 @@ func GetUpgradeSkipNodeLabelKey() string { return fmt.Sprintf(UpgradeSkipNodeLabelKeyFmt, DriverName) } +// GetUpgradeRequestorLabelKey returns nodeMaintenance label used to mark obj as modified by requestor +func GetUpgradeRequestorLabelKey(requestorID string) string { + return fmt.Sprintf(UpgradeRequestorLabelKeyFmt, requestorID) +} + // GetUpgradeDriverWaitForSafeLoadAnnotationKey returns the key for annotation used to mark node as waiting for driver // safe load func GetUpgradeDriverWaitForSafeLoadAnnotationKey() string {