From ad0859ef8ada78d57a93d453cd6b444c1d02a34d Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Sat, 11 Apr 2026 08:28:48 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix:=20allow=20reconciliation=20?= =?UTF-8?q?of=20deadline-exceeded=20ClusterObjectSets?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the skipProgressDeadlineExceededPredicate that blocked all update events for COS objects with ProgressDeadlineExceeded. This predicate prevented archival of stuck revisions because the lifecycle state patch was dropped as an update event. To prevent the reconcile loop that the predicate was masking, markAsProgressing now sets ProgressDeadlineExceeded instead of RollingOut/Retrying when the deadline has been exceeded. Terminal reasons (Succeeded) always apply. Unregistered reasons panic. Continue reconciling after ProgressDeadlineExceeded rather than clearing the error and stopping requeue. This allows revisions to recover if a transient error resolves itself, even after the deadline was exceeded. Extract durationUntilDeadline as a shared helper for deadline computation. Add a deadlineAwareRateLimiter that caps exponential backoff at the deadline so ProgressDeadlineExceeded is set promptly even during error retries. Move the deadline requeue logic into requeueForDeadline, called from within reconcile when probes are still failing. Add an e2e test that creates a COS with a never-ready deployment, waits for ProgressDeadlineExceeded, archives the COS, and verifies cleanup. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../clusterobjectset_controller.go | 188 ++++++++++++------ .../clusterobjectset_controller_test.go | 2 +- test/e2e/features/revision.feature | 69 ++++++- test/e2e/steps/steps.go | 18 ++ 4 files changed, 217 insertions(+), 60 deletions(-) diff --git a/internal/operator-controller/controllers/clusterobjectset_controller.go b/internal/operator-controller/controllers/clusterobjectset_controller.go index b34730b77..72f9265c4 100644 --- a/internal/operator-controller/controllers/clusterobjectset_controller.go +++ b/internal/operator-controller/controllers/clusterobjectset_controller.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" "pkg.package-operator.run/boxcutter" "pkg.package-operator.run/boxcutter/machinery" @@ -30,8 +31,8 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -81,29 +82,6 @@ func (c *ClusterObjectSetReconciler) Reconcile(ctx context.Context, req ctrl.Req reconciledRev := existingRev.DeepCopy() res, reconcileErr := c.reconcile(ctx, reconciledRev) - if pd := existingRev.Spec.ProgressDeadlineMinutes; pd > 0 { - cnd := meta.FindStatusCondition(reconciledRev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing) - isStillProgressing := cnd != nil && cnd.Status == metav1.ConditionTrue && cnd.Reason != ocv1.ReasonSucceeded - succeeded := meta.IsStatusConditionTrue(reconciledRev.Status.Conditions, ocv1.ClusterObjectSetTypeSucceeded) - // check if we reached the progress deadline only if the revision is still progressing and has not succeeded yet - if isStillProgressing && !succeeded { - timeout := time.Duration(pd) * time.Minute - if c.Clock.Since(existingRev.CreationTimestamp.Time) > timeout { - // progress deadline reached, reset any errors and stop reconciling this revision - markAsNotProgressing(reconciledRev, ocv1.ReasonProgressDeadlineExceeded, fmt.Sprintf("Revision has not rolled out for %d minute(s).", pd)) - reconcileErr = nil - res = ctrl.Result{} - } else if reconcileErr == nil { - // We want to requeue so far in the future that the next reconciliation - // can detect if the revision did not progress within the given timeout. - // Thus, we plan the next reconcile slightly after (+2secs) the timeout is passed. - drift := 2 * time.Second - requeueAfter := existingRev.CreationTimestamp.Time.Add(timeout).Add(drift).Sub(c.Clock.Now()).Round(time.Second) - l.Info(fmt.Sprintf("ProgressDeadline not exceeded, requeue after ~%v to check again.", requeueAfter)) - res = ctrl.Result{RequeueAfter: requeueAfter} - } - } - } // Do checks before any Update()s, as Update() may modify the resource structure! updateStatus := !equality.Semantic.DeepEqual(existingRev.Status, reconciledRev.Status) @@ -144,15 +122,18 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl return c.delete(ctx, cos) } + remaining, hasDeadline := c.durationUntilDeadline(cos) + isDeadlineExceeded := hasDeadline && remaining <= 0 + phases, opts, err := c.buildBoxcutterPhases(ctx, cos) if err != nil { - setRetryingConditions(cos, err.Error()) + setRetryingConditions(cos, err.Error(), isDeadlineExceeded) return ctrl.Result{}, fmt.Errorf("converting to boxcutter revision: %v", err) } revisionEngine, err := c.RevisionEngineFactory.CreateRevisionEngine(ctx, cos) if err != nil { - setRetryingConditions(cos, err.Error()) + setRetryingConditions(cos, err.Error(), isDeadlineExceeded) return ctrl.Result{}, fmt.Errorf("failed to create revision engine: %v", err) } @@ -178,7 +159,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl if err := c.establishWatch(ctx, cos, revision); err != nil { werr := fmt.Errorf("establish watch: %v", err) - setRetryingConditions(cos, werr.Error()) + setRetryingConditions(cos, werr.Error(), isDeadlineExceeded) return ctrl.Result{}, werr } @@ -188,7 +169,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl // Log detailed reconcile reports only in debug mode (V(1)) to reduce verbosity. l.V(1).Info("reconcile report", "report", rres.String()) } - setRetryingConditions(cos, err.Error()) + setRetryingConditions(cos, err.Error(), isDeadlineExceeded) return ctrl.Result{}, fmt.Errorf("revision reconcile: %v", err) } @@ -196,14 +177,14 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl // TODO: report status, backoff? if verr := rres.GetValidationError(); verr != nil { l.Error(fmt.Errorf("%w", verr), "preflight validation failed, retrying after 10s") - setRetryingConditions(cos, fmt.Sprintf("revision validation error: %s", verr)) + setRetryingConditions(cos, fmt.Sprintf("revision validation error: %s", verr), isDeadlineExceeded) return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } for i, pres := range rres.GetPhases() { if verr := pres.GetValidationError(); verr != nil { l.Error(fmt.Errorf("%w", verr), "phase preflight validation failed, retrying after 10s", "phase", i) - setRetryingConditions(cos, fmt.Sprintf("phase %d validation error: %s", i, verr)) + setRetryingConditions(cos, fmt.Sprintf("phase %d validation error: %s", i, verr), isDeadlineExceeded) return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } @@ -216,14 +197,14 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl if len(collidingObjs) > 0 { l.Error(fmt.Errorf("object collision detected"), "object collision, retrying after 10s", "phase", i, "collisions", collidingObjs) - setRetryingConditions(cos, fmt.Sprintf("revision object collisions in phase %d\n%s", i, strings.Join(collidingObjs, "\n\n"))) + setRetryingConditions(cos, fmt.Sprintf("revision object collisions in phase %d\n%s", i, strings.Join(collidingObjs, "\n\n")), isDeadlineExceeded) return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } } revVersion := cos.GetAnnotations()[labels.BundleVersionKey] if rres.InTransition() { - markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion)) + markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion), isDeadlineExceeded) } //nolint:nestif @@ -243,7 +224,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl } } - markAsProgressing(cos, ocv1.ReasonSucceeded, fmt.Sprintf("Revision %s has rolled out.", revVersion)) + markAsProgressing(cos, ocv1.ReasonSucceeded, fmt.Sprintf("Revision %s has rolled out.", revVersion), isDeadlineExceeded) markAsAvailable(cos, ocv1.ClusterObjectSetReasonProbesSucceeded, "Objects are available and pass all probes.") // We'll probably only want to remove this once we are done updating the ClusterExtension conditions @@ -288,14 +269,23 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl } else { markAsUnavailable(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion)) } - if meta.FindStatusCondition(cos.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing) == nil { - markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion)) - } + markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion), isDeadlineExceeded) + return c.requeueForDeadline(cos), nil } return ctrl.Result{}, nil } +// requeueForDeadline returns a Result that requeues at the progress deadline +// if one is configured and has not yet been exceeded. This ensures that +// ProgressDeadlineExceeded is set promptly even when no object events occur. +func (c *ClusterObjectSetReconciler) requeueForDeadline(cos *ocv1.ClusterObjectSet) ctrl.Result { + if remaining, hasDeadline := c.durationUntilDeadline(cos); hasDeadline && remaining > 0 { + return ctrl.Result{RequeueAfter: remaining} + } + return ctrl.Result{} +} + func (c *ClusterObjectSetReconciler) delete(ctx context.Context, cos *ocv1.ClusterObjectSet) (ctrl.Result, error) { if err := c.TrackingCache.Free(ctx, cos); err != nil { markAsAvailableUnknown(cos, ocv1.ClusterObjectSetReasonReconciling, err.Error()) @@ -311,11 +301,11 @@ func (c *ClusterObjectSetReconciler) archive(ctx context.Context, revisionEngine tdres, err := revisionEngine.Teardown(ctx, revision) if err != nil { err = fmt.Errorf("error archiving revision: %v", err) - setRetryingConditions(cos, err.Error()) + setRetryingConditions(cos, err.Error(), false) return ctrl.Result{}, err } if tdres != nil && !tdres.IsComplete() { - setRetryingConditions(cos, "removing revision resources that are not owned by another revision") + setRetryingConditions(cos, "removing revision resources that are not owned by another revision", false) return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } // Ensure conditions are set before removing the finalizer when archiving @@ -333,29 +323,19 @@ type Sourcoser interface { } func (c *ClusterObjectSetReconciler) SetupWithManager(mgr ctrl.Manager) error { - skipProgressDeadlineExceededPredicate := predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - rev, ok := e.ObjectNew.(*ocv1.ClusterObjectSet) - if !ok { - return true - } - // allow deletions to happen - if !rev.DeletionTimestamp.IsZero() { - return true - } - if cnd := meta.FindStatusCondition(rev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing); cnd != nil && cnd.Status == metav1.ConditionFalse && cnd.Reason == ocv1.ReasonProgressDeadlineExceeded { - return false - } - return true - }, - } c.Clock = clock.RealClock{} return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{ + RateLimiter: newDeadlineAwareRateLimiter( + workqueue.DefaultTypedControllerRateLimiter[ctrl.Request](), + mgr.GetClient(), + c.Clock, + ), + }). For( &ocv1.ClusterObjectSet{}, builder.WithPredicates( predicate.ResourceVersionChangedPredicate{}, - skipProgressDeadlineExceededPredicate, ), ). WatchesRawSource( @@ -367,6 +347,51 @@ func (c *ClusterObjectSetReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(c) } +// deadlineAwareRateLimiter wraps a delegate rate limiter and caps the backoff +// duration to the time remaining until the COS progress deadline (+2s), ensuring +// that ProgressDeadlineExceeded is set promptly even during exponential backoff. +type deadlineAwareRateLimiter struct { + delegate workqueue.TypedRateLimiter[ctrl.Request] + client client.Reader + clock clock.Clock +} + +func newDeadlineAwareRateLimiter( + delegate workqueue.TypedRateLimiter[ctrl.Request], + c client.Reader, + clk clock.Clock, +) *deadlineAwareRateLimiter { + return &deadlineAwareRateLimiter{delegate: delegate, client: c, clock: clk} +} + +func (r *deadlineAwareRateLimiter) When(item ctrl.Request) time.Duration { + backoff := r.delegate.When(item) + + cos := &ocv1.ClusterObjectSet{} + if err := r.client.Get(context.Background(), item.NamespacedName, cos); err != nil { + return backoff + } + + remaining, hasDeadline := durationUntilDeadline(r.clock, cos) + if !hasDeadline { + return backoff + } + + deadline := remaining + 2*time.Second + if deadline > 0 && deadline < backoff { + return deadline + } + return backoff +} + +func (r *deadlineAwareRateLimiter) Forget(item ctrl.Request) { + r.delegate.Forget(item) +} + +func (r *deadlineAwareRateLimiter) NumRequeues(item ctrl.Request) int { + return r.delegate.NumRequeues(item) +} + func (c *ClusterObjectSetReconciler) establishWatch(ctx context.Context, cos *ocv1.ClusterObjectSet, revision boxcutter.RevisionBuilder) error { gvks := sets.New[schema.GroupVersionKind]() for _, phase := range revision.GetPhases() { @@ -631,14 +656,61 @@ func buildProgressionProbes(progressionProbes []ocv1.ProgressionProbe) (probing. return userProbes, nil } -func setRetryingConditions(cos *ocv1.ClusterObjectSet, message string) { - markAsProgressing(cos, ocv1.ClusterObjectSetReasonRetrying, message) +func setRetryingConditions(cos *ocv1.ClusterObjectSet, message string, isDeadlineExceeded bool) { + markAsProgressing(cos, ocv1.ClusterObjectSetReasonRetrying, message, isDeadlineExceeded) if meta.FindStatusCondition(cos.Status.Conditions, ocv1.ClusterObjectSetTypeAvailable) != nil { markAsAvailableUnknown(cos, ocv1.ClusterObjectSetReasonReconciling, message) } } -func markAsProgressing(cos *ocv1.ClusterObjectSet, reason, message string) { +// durationUntilDeadline returns how much time is left before the progress deadline +// is exceeded. A negative duration means the deadline has already passed. If there +// is no deadline (progressDeadlineMinutes is 0 or the revision has already succeeded), +// it returns -1 and false. +func (c *ClusterObjectSetReconciler) durationUntilDeadline(cos *ocv1.ClusterObjectSet) (time.Duration, bool) { + return durationUntilDeadline(c.Clock, cos) +} + +// durationUntilDeadline returns how much time is left before the progress deadline +// is exceeded. A negative duration means the deadline has already passed. If there +// is no deadline (progressDeadlineMinutes is 0 or the revision has already succeeded), +// it returns -1 and false. +func durationUntilDeadline(clk clock.Clock, cos *ocv1.ClusterObjectSet) (time.Duration, bool) { + pd := cos.Spec.ProgressDeadlineMinutes + if pd <= 0 { + return -1, false + } + // Succeeded is a latch — once set, it's never cleared. A revision that + // has already succeeded should not be blocked by the deadline, even if + // it temporarily goes back to InTransition (e.g., recovery after drift). + if meta.IsStatusConditionTrue(cos.Status.Conditions, ocv1.ClusterObjectSetTypeSucceeded) { + return -1, false + } + timeout := time.Duration(pd) * time.Minute + return timeout - clk.Since(cos.CreationTimestamp.Time), true +} + +// markAsProgressing sets the Progressing condition to True with the given reason. +// +// For non-terminal reasons (RollingOut, Retrying), if isDeadlineExceeded is true, +// the condition is set to Progressing=False/ProgressDeadlineExceeded instead. This +// prevents a reconcile loop where RollingOut and ProgressDeadlineExceeded overwrite +// each other on every cycle. +// +// Terminal reasons (Succeeded) are always applied. Unregistered reasons panic. +func markAsProgressing(cos *ocv1.ClusterObjectSet, reason, message string, isDeadlineExceeded bool) { + switch reason { + case ocv1.ReasonSucceeded: + // Terminal — always apply. + case ocv1.ReasonRollingOut, ocv1.ClusterObjectSetReasonRetrying: + if isDeadlineExceeded { + markAsNotProgressing(cos, ocv1.ReasonProgressDeadlineExceeded, + fmt.Sprintf("Revision has not rolled out for %d minute(s).", cos.Spec.ProgressDeadlineMinutes)) + return + } + default: + panic(fmt.Sprintf("unregistered progressing reason: %q", reason)) + } meta.SetStatusCondition(&cos.Status.Conditions, metav1.Condition{ Type: ocv1.ClusterObjectSetTypeProgressing, Status: metav1.ConditionTrue, diff --git a/internal/operator-controller/controllers/clusterobjectset_controller_test.go b/internal/operator-controller/controllers/clusterobjectset_controller_test.go index b3b10f575..b43a608e1 100644 --- a/internal/operator-controller/controllers/clusterobjectset_controller_test.go +++ b/internal/operator-controller/controllers/clusterobjectset_controller_test.go @@ -988,7 +988,7 @@ func Test_ClusterObjectSetReconciler_Reconcile_ProgressDeadline(t *testing.T) { revisionResult: &mockRevisionResult{ inTransition: true, }, - reconcileResult: ctrl.Result{RequeueAfter: 62 * time.Second}, + reconcileResult: ctrl.Result{RequeueAfter: 60 * time.Second}, validate: func(t *testing.T, c client.Client) { rev := &ocv1.ClusterObjectSet{} err := c.Get(t.Context(), client.ObjectKey{ diff --git a/test/e2e/features/revision.feature b/test/e2e/features/revision.feature index dd6d9e940..2b1a19720 100644 --- a/test/e2e/features/revision.feature +++ b/test/e2e/features/revision.feature @@ -442,4 +442,71 @@ Feature: Install ClusterObjectSet Then ClusterObjectSet "${COS_NAME}" reports Progressing as True with Reason Succeeded And ClusterObjectSet "${COS_NAME}" reports Available as True with Reason ProbesSucceeded And resource "configmap/test-configmap-ref" is installed - And resource "deployment/test-httpd" is installed \ No newline at end of file + And resource "deployment/test-httpd" is installed + + Scenario: Archiving a COS with ProgressDeadlineExceeded cleans up its resources + Given min value for ClusterObjectSet .spec.progressDeadlineMinutes is set to 1 + And ServiceAccount "olm-sa" with needed permissions is available in test namespace + When ClusterObjectSet is applied + """ + apiVersion: olm.operatorframework.io/v1 + kind: ClusterObjectSet + metadata: + annotations: + olm.operatorframework.io/service-account-name: olm-sa + olm.operatorframework.io/service-account-namespace: ${TEST_NAMESPACE} + name: ${COS_NAME} + spec: + lifecycleState: Active + collisionProtection: Prevent + progressDeadlineMinutes: 1 + progressionProbes: + - selector: + type: GroupKind + groupKind: + group: apps + kind: Deployment + assertions: + - type: ConditionEqual + conditionEqual: + type: Available + status: "True" + phases: + - name: resources + objects: + - object: + apiVersion: v1 + kind: ConfigMap + metadata: + name: test-configmap + namespace: ${TEST_NAMESPACE} + data: + foo: bar + - object: + apiVersion: apps/v1 + kind: Deployment + metadata: + name: test-deployment + namespace: ${TEST_NAMESPACE} + spec: + replicas: 1 + selector: + matchLabels: + app: never-ready + template: + metadata: + labels: + app: never-ready + spec: + containers: + - name: never-ready + image: does-not-exist:latest + revision: 1 + """ + Then resource "configmap/test-configmap" is installed + And resource "deployment/test-deployment" is installed + And ClusterObjectSet "${COS_NAME}" reports Progressing as False with Reason ProgressDeadlineExceeded + When ClusterObjectSet "${COS_NAME}" lifecycle is set to "Archived" + Then ClusterObjectSet "${COS_NAME}" is archived + And resource "configmap/test-configmap" is eventually not found + And resource "deployment/test-deployment" is eventually not found diff --git a/test/e2e/steps/steps.go b/test/e2e/steps/steps.go index 2b5603ab7..3cf584054 100644 --- a/test/e2e/steps/steps.go +++ b/test/e2e/steps/steps.go @@ -77,6 +77,7 @@ func RegisterSteps(sc *godog.ScenarioContext) { sc.Step(`^(?i)ClusterExtension is applied(?:\s+.*)?$`, ResourceIsApplied) sc.Step(`^(?i)ClusterExtension is updated to version "([^"]+)"$`, ClusterExtensionVersionUpdate) sc.Step(`^(?i)ClusterExtension is updated(?:\s+.*)?$`, ResourceIsApplied) + sc.Step(`^(?i)ClusterObjectSet "([^"]+)" lifecycle is set to "([^"]+)"$`, ClusterObjectSetLifecycleUpdate) sc.Step(`^(?i)ClusterExtension is available$`, ClusterExtensionIsAvailable) sc.Step(`^(?i)ClusterExtension is rolled out$`, ClusterExtensionIsRolledOut) sc.Step(`^(?i)ClusterExtension resources are created and labeled$`, ClusterExtensionResourcesCreatedAndAreLabeled) @@ -293,6 +294,23 @@ func ClusterExtensionVersionUpdate(ctx context.Context, version string) error { return err } +// ClusterObjectSetLifecycleUpdate patches the ClusterObjectSet's lifecycleState to the specified value. +func ClusterObjectSetLifecycleUpdate(ctx context.Context, cosName, lifecycle string) error { + sc := scenarioCtx(ctx) + cosName = substituteScenarioVars(cosName, sc) + patch := map[string]any{ + "spec": map[string]any{ + "lifecycleState": lifecycle, + }, + } + pb, err := json.Marshal(patch) + if err != nil { + return err + } + _, err = k8sClient("patch", "clusterobjectset", cosName, "--type", "merge", "-p", string(pb)) + return err +} + // ResourceIsApplied applies the provided YAML resource to the cluster and in case of ClusterExtension or ClusterObjectSet it captures // its name in the test context so that it can be referred to in later steps with ${NAME} or ${COS_NAME}, respectively func ResourceIsApplied(ctx context.Context, yamlTemplate *godog.DocString) error {