@@ -21,6 +21,7 @@ import (
2121 "k8s.io/apimachinery/pkg/runtime/schema"
2222 "k8s.io/apimachinery/pkg/types"
2323 "k8s.io/apimachinery/pkg/util/sets"
24+ "k8s.io/client-go/util/workqueue"
2425 "k8s.io/utils/clock"
2526 "pkg.package-operator.run/boxcutter"
2627 "pkg.package-operator.run/boxcutter/machinery"
@@ -30,8 +31,8 @@ import (
3031 ctrl "sigs.k8s.io/controller-runtime"
3132 "sigs.k8s.io/controller-runtime/pkg/builder"
3233 "sigs.k8s.io/controller-runtime/pkg/client"
34+ "sigs.k8s.io/controller-runtime/pkg/controller"
3335 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
34- "sigs.k8s.io/controller-runtime/pkg/event"
3536 "sigs.k8s.io/controller-runtime/pkg/handler"
3637 "sigs.k8s.io/controller-runtime/pkg/log"
3738 "sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -81,29 +82,6 @@ func (c *ClusterObjectSetReconciler) Reconcile(ctx context.Context, req ctrl.Req
8182 reconciledRev := existingRev .DeepCopy ()
8283 res , reconcileErr := c .reconcile (ctx , reconciledRev )
8384
84- if pd := existingRev .Spec .ProgressDeadlineMinutes ; pd > 0 {
85- cnd := meta .FindStatusCondition (reconciledRev .Status .Conditions , ocv1 .ClusterObjectSetTypeProgressing )
86- isStillProgressing := cnd != nil && cnd .Status == metav1 .ConditionTrue && cnd .Reason != ocv1 .ReasonSucceeded
87- succeeded := meta .IsStatusConditionTrue (reconciledRev .Status .Conditions , ocv1 .ClusterObjectSetTypeSucceeded )
88- // check if we reached the progress deadline only if the revision is still progressing and has not succeeded yet
89- if isStillProgressing && ! succeeded {
90- timeout := time .Duration (pd ) * time .Minute
91- if c .Clock .Since (existingRev .CreationTimestamp .Time ) > timeout {
92- // progress deadline reached, reset any errors and stop reconciling this revision
93- markAsNotProgressing (reconciledRev , ocv1 .ReasonProgressDeadlineExceeded , fmt .Sprintf ("Revision has not rolled out for %d minute(s)." , pd ))
94- reconcileErr = nil
95- res = ctrl.Result {}
96- } else if reconcileErr == nil {
97- // We want to requeue so far in the future that the next reconciliation
98- // can detect if the revision did not progress within the given timeout.
99- // Thus, we plan the next reconcile slightly after (+2secs) the timeout is passed.
100- drift := 2 * time .Second
101- requeueAfter := existingRev .CreationTimestamp .Time .Add (timeout ).Add (drift ).Sub (c .Clock .Now ()).Round (time .Second )
102- l .Info (fmt .Sprintf ("ProgressDeadline not exceeded, requeue after ~%v to check again." , requeueAfter ))
103- res = ctrl.Result {RequeueAfter : requeueAfter }
104- }
105- }
106- }
10785 // Do checks before any Update()s, as Update() may modify the resource structure!
10886 updateStatus := ! equality .Semantic .DeepEqual (existingRev .Status , reconciledRev .Status )
10987
@@ -144,15 +122,18 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
144122 return c .delete (ctx , cos )
145123 }
146124
125+ remaining , hasDeadline := c .durationUntilDeadline (cos )
126+ isDeadlineExceeded := hasDeadline && remaining <= 0
127+
147128 phases , opts , err := c .buildBoxcutterPhases (ctx , cos )
148129 if err != nil {
149- setRetryingConditions (cos , err .Error ())
130+ setRetryingConditions (cos , err .Error (), isDeadlineExceeded )
150131 return ctrl.Result {}, fmt .Errorf ("converting to boxcutter revision: %v" , err )
151132 }
152133
153134 revisionEngine , err := c .RevisionEngineFactory .CreateRevisionEngine (ctx , cos )
154135 if err != nil {
155- setRetryingConditions (cos , err .Error ())
136+ setRetryingConditions (cos , err .Error (), isDeadlineExceeded )
156137 return ctrl.Result {}, fmt .Errorf ("failed to create revision engine: %v" , err )
157138 }
158139
@@ -178,7 +159,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
178159
179160 if err := c .establishWatch (ctx , cos , revision ); err != nil {
180161 werr := fmt .Errorf ("establish watch: %v" , err )
181- setRetryingConditions (cos , werr .Error ())
162+ setRetryingConditions (cos , werr .Error (), isDeadlineExceeded )
182163 return ctrl.Result {}, werr
183164 }
184165
@@ -188,22 +169,22 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
188169 // Log detailed reconcile reports only in debug mode (V(1)) to reduce verbosity.
189170 l .V (1 ).Info ("reconcile report" , "report" , rres .String ())
190171 }
191- setRetryingConditions (cos , err .Error ())
172+ setRetryingConditions (cos , err .Error (), isDeadlineExceeded )
192173 return ctrl.Result {}, fmt .Errorf ("revision reconcile: %v" , err )
193174 }
194175
195176 // Retry failing preflight checks with a flat 10s retry.
196177 // TODO: report status, backoff?
197178 if verr := rres .GetValidationError (); verr != nil {
198179 l .Error (fmt .Errorf ("%w" , verr ), "preflight validation failed, retrying after 10s" )
199- setRetryingConditions (cos , fmt .Sprintf ("revision validation error: %s" , verr ))
180+ setRetryingConditions (cos , fmt .Sprintf ("revision validation error: %s" , verr ), isDeadlineExceeded )
200181 return ctrl.Result {RequeueAfter : 10 * time .Second }, nil
201182 }
202183
203184 for i , pres := range rres .GetPhases () {
204185 if verr := pres .GetValidationError (); verr != nil {
205186 l .Error (fmt .Errorf ("%w" , verr ), "phase preflight validation failed, retrying after 10s" , "phase" , i )
206- setRetryingConditions (cos , fmt .Sprintf ("phase %d validation error: %s" , i , verr ))
187+ setRetryingConditions (cos , fmt .Sprintf ("phase %d validation error: %s" , i , verr ), isDeadlineExceeded )
207188 return ctrl.Result {RequeueAfter : 10 * time .Second }, nil
208189 }
209190
@@ -216,14 +197,14 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
216197
217198 if len (collidingObjs ) > 0 {
218199 l .Error (fmt .Errorf ("object collision detected" ), "object collision, retrying after 10s" , "phase" , i , "collisions" , collidingObjs )
219- setRetryingConditions (cos , fmt .Sprintf ("revision object collisions in phase %d\n %s" , i , strings .Join (collidingObjs , "\n \n " )))
200+ setRetryingConditions (cos , fmt .Sprintf ("revision object collisions in phase %d\n %s" , i , strings .Join (collidingObjs , "\n \n " )), isDeadlineExceeded )
220201 return ctrl.Result {RequeueAfter : 10 * time .Second }, nil
221202 }
222203 }
223204
224205 revVersion := cos .GetAnnotations ()[labels .BundleVersionKey ]
225206 if rres .InTransition () {
226- markAsProgressing (cos , ocv1 .ReasonRollingOut , fmt .Sprintf ("Revision %s is rolling out." , revVersion ))
207+ markAsProgressing (cos , ocv1 .ReasonRollingOut , fmt .Sprintf ("Revision %s is rolling out." , revVersion ), isDeadlineExceeded )
227208 }
228209
229210 //nolint:nestif
@@ -243,7 +224,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
243224 }
244225 }
245226
246- markAsProgressing (cos , ocv1 .ReasonSucceeded , fmt .Sprintf ("Revision %s has rolled out." , revVersion ))
227+ markAsProgressing (cos , ocv1 .ReasonSucceeded , fmt .Sprintf ("Revision %s has rolled out." , revVersion ), isDeadlineExceeded )
247228 markAsAvailable (cos , ocv1 .ClusterObjectSetReasonProbesSucceeded , "Objects are available and pass all probes." )
248229
249230 // We'll probably only want to remove this once we are done updating the ClusterExtension conditions
@@ -288,14 +269,23 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
288269 } else {
289270 markAsUnavailable (cos , ocv1 .ReasonRollingOut , fmt .Sprintf ("Revision %s is rolling out." , revVersion ))
290271 }
291- if meta .FindStatusCondition (cos .Status .Conditions , ocv1 .ClusterObjectSetTypeProgressing ) == nil {
292- markAsProgressing (cos , ocv1 .ReasonRollingOut , fmt .Sprintf ("Revision %s is rolling out." , revVersion ))
293- }
272+ markAsProgressing (cos , ocv1 .ReasonRollingOut , fmt .Sprintf ("Revision %s is rolling out." , revVersion ), isDeadlineExceeded )
273+ return c .requeueForDeadline (cos ), nil
294274 }
295275
296276 return ctrl.Result {}, nil
297277}
298278
279+ // requeueForDeadline returns a Result that requeues at the progress deadline
280+ // if one is configured and has not yet been exceeded. This ensures that
281+ // ProgressDeadlineExceeded is set promptly even when no object events occur.
282+ func (c * ClusterObjectSetReconciler ) requeueForDeadline (cos * ocv1.ClusterObjectSet ) ctrl.Result {
283+ if remaining , hasDeadline := c .durationUntilDeadline (cos ); hasDeadline && remaining > 0 {
284+ return ctrl.Result {RequeueAfter : remaining }
285+ }
286+ return ctrl.Result {}
287+ }
288+
299289func (c * ClusterObjectSetReconciler ) delete (ctx context.Context , cos * ocv1.ClusterObjectSet ) (ctrl.Result , error ) {
300290 if err := c .TrackingCache .Free (ctx , cos ); err != nil {
301291 markAsAvailableUnknown (cos , ocv1 .ClusterObjectSetReasonReconciling , err .Error ())
@@ -311,11 +301,11 @@ func (c *ClusterObjectSetReconciler) archive(ctx context.Context, revisionEngine
311301 tdres , err := revisionEngine .Teardown (ctx , revision )
312302 if err != nil {
313303 err = fmt .Errorf ("error archiving revision: %v" , err )
314- setRetryingConditions (cos , err .Error ())
304+ setRetryingConditions (cos , err .Error (), false )
315305 return ctrl.Result {}, err
316306 }
317307 if tdres != nil && ! tdres .IsComplete () {
318- setRetryingConditions (cos , "removing revision resources that are not owned by another revision" )
308+ setRetryingConditions (cos , "removing revision resources that are not owned by another revision" , false )
319309 return ctrl.Result {RequeueAfter : 5 * time .Second }, nil
320310 }
321311 // Ensure conditions are set before removing the finalizer when archiving
@@ -333,29 +323,19 @@ type Sourcoser interface {
333323}
334324
335325func (c * ClusterObjectSetReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
336- skipProgressDeadlineExceededPredicate := predicate.Funcs {
337- UpdateFunc : func (e event.UpdateEvent ) bool {
338- rev , ok := e .ObjectNew .(* ocv1.ClusterObjectSet )
339- if ! ok {
340- return true
341- }
342- // allow deletions to happen
343- if ! rev .DeletionTimestamp .IsZero () {
344- return true
345- }
346- if cnd := meta .FindStatusCondition (rev .Status .Conditions , ocv1 .ClusterObjectSetTypeProgressing ); cnd != nil && cnd .Status == metav1 .ConditionFalse && cnd .Reason == ocv1 .ReasonProgressDeadlineExceeded {
347- return false
348- }
349- return true
350- },
351- }
352326 c .Clock = clock.RealClock {}
353327 return ctrl .NewControllerManagedBy (mgr ).
328+ WithOptions (controller.Options {
329+ RateLimiter : newDeadlineAwareRateLimiter (
330+ workqueue .DefaultTypedControllerRateLimiter [ctrl.Request ](),
331+ mgr .GetClient (),
332+ c .Clock ,
333+ ),
334+ }).
354335 For (
355336 & ocv1.ClusterObjectSet {},
356337 builder .WithPredicates (
357338 predicate.ResourceVersionChangedPredicate {},
358- skipProgressDeadlineExceededPredicate ,
359339 ),
360340 ).
361341 WatchesRawSource (
@@ -367,6 +347,51 @@ func (c *ClusterObjectSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
367347 Complete (c )
368348}
369349
350+ // deadlineAwareRateLimiter wraps a delegate rate limiter and caps the backoff
351+ // duration to the time remaining until the COS progress deadline (+2s), ensuring
352+ // that ProgressDeadlineExceeded is set promptly even during exponential backoff.
353+ type deadlineAwareRateLimiter struct {
354+ delegate workqueue.TypedRateLimiter [ctrl.Request ]
355+ client client.Reader
356+ clock clock.Clock
357+ }
358+
359+ func newDeadlineAwareRateLimiter (
360+ delegate workqueue.TypedRateLimiter [ctrl.Request ],
361+ c client.Reader ,
362+ clk clock.Clock ,
363+ ) * deadlineAwareRateLimiter {
364+ return & deadlineAwareRateLimiter {delegate : delegate , client : c , clock : clk }
365+ }
366+
367+ func (r * deadlineAwareRateLimiter ) When (item ctrl.Request ) time.Duration {
368+ backoff := r .delegate .When (item )
369+
370+ cos := & ocv1.ClusterObjectSet {}
371+ if err := r .client .Get (context .Background (), item .NamespacedName , cos ); err != nil {
372+ return backoff
373+ }
374+
375+ remaining , hasDeadline := durationUntilDeadline (r .clock , cos )
376+ if ! hasDeadline {
377+ return backoff
378+ }
379+
380+ deadline := remaining + 2 * time .Second
381+ if deadline > 0 && deadline < backoff {
382+ return deadline
383+ }
384+ return backoff
385+ }
386+
387+ func (r * deadlineAwareRateLimiter ) Forget (item ctrl.Request ) {
388+ r .delegate .Forget (item )
389+ }
390+
391+ func (r * deadlineAwareRateLimiter ) NumRequeues (item ctrl.Request ) int {
392+ return r .delegate .NumRequeues (item )
393+ }
394+
370395func (c * ClusterObjectSetReconciler ) establishWatch (ctx context.Context , cos * ocv1.ClusterObjectSet , revision boxcutter.RevisionBuilder ) error {
371396 gvks := sets .New [schema.GroupVersionKind ]()
372397 for _ , phase := range revision .GetPhases () {
@@ -631,14 +656,61 @@ func buildProgressionProbes(progressionProbes []ocv1.ProgressionProbe) (probing.
631656 return userProbes , nil
632657}
633658
634- func setRetryingConditions (cos * ocv1.ClusterObjectSet , message string ) {
635- markAsProgressing (cos , ocv1 .ClusterObjectSetReasonRetrying , message )
659+ func setRetryingConditions (cos * ocv1.ClusterObjectSet , message string , isDeadlineExceeded bool ) {
660+ markAsProgressing (cos , ocv1 .ClusterObjectSetReasonRetrying , message , isDeadlineExceeded )
636661 if meta .FindStatusCondition (cos .Status .Conditions , ocv1 .ClusterObjectSetTypeAvailable ) != nil {
637662 markAsAvailableUnknown (cos , ocv1 .ClusterObjectSetReasonReconciling , message )
638663 }
639664}
640665
641- func markAsProgressing (cos * ocv1.ClusterObjectSet , reason , message string ) {
666+ // durationUntilDeadline returns how much time is left before the progress deadline
667+ // is exceeded. A negative duration means the deadline has already passed. If there
668+ // is no deadline (progressDeadlineMinutes is 0 or the revision has already succeeded),
669+ // it returns -1 and false.
670+ func (c * ClusterObjectSetReconciler ) durationUntilDeadline (cos * ocv1.ClusterObjectSet ) (time.Duration , bool ) {
671+ return durationUntilDeadline (c .Clock , cos )
672+ }
673+
674+ // durationUntilDeadline returns how much time is left before the progress deadline
675+ // is exceeded. A negative duration means the deadline has already passed. If there
676+ // is no deadline (progressDeadlineMinutes is 0 or the revision has already succeeded),
677+ // it returns -1 and false.
678+ func durationUntilDeadline (clk clock.Clock , cos * ocv1.ClusterObjectSet ) (time.Duration , bool ) {
679+ pd := cos .Spec .ProgressDeadlineMinutes
680+ if pd <= 0 {
681+ return - 1 , false
682+ }
683+ // Succeeded is a latch — once set, it's never cleared. A revision that
684+ // has already succeeded should not be blocked by the deadline, even if
685+ // it temporarily goes back to InTransition (e.g., recovery after drift).
686+ if meta .IsStatusConditionTrue (cos .Status .Conditions , ocv1 .ClusterObjectSetTypeSucceeded ) {
687+ return - 1 , false
688+ }
689+ timeout := time .Duration (pd ) * time .Minute
690+ return timeout - clk .Since (cos .CreationTimestamp .Time ), true
691+ }
692+
693+ // markAsProgressing sets the Progressing condition to True with the given reason.
694+ //
695+ // For non-terminal reasons (RollingOut, Retrying), if isDeadlineExceeded is true,
696+ // the condition is set to Progressing=False/ProgressDeadlineExceeded instead. This
697+ // prevents a reconcile loop where RollingOut and ProgressDeadlineExceeded overwrite
698+ // each other on every cycle.
699+ //
700+ // Terminal reasons (Succeeded) are always applied. Unregistered reasons panic.
701+ func markAsProgressing (cos * ocv1.ClusterObjectSet , reason , message string , isDeadlineExceeded bool ) {
702+ switch reason {
703+ case ocv1 .ReasonSucceeded :
704+ // Terminal — always apply.
705+ case ocv1 .ReasonRollingOut , ocv1 .ClusterObjectSetReasonRetrying :
706+ if isDeadlineExceeded {
707+ markAsNotProgressing (cos , ocv1 .ReasonProgressDeadlineExceeded ,
708+ fmt .Sprintf ("Revision has not rolled out for %d minute(s)." , cos .Spec .ProgressDeadlineMinutes ))
709+ return
710+ }
711+ default :
712+ panic (fmt .Sprintf ("unregistered progressing reason: %q" , reason ))
713+ }
642714 meta .SetStatusCondition (& cos .Status .Conditions , metav1.Condition {
643715 Type : ocv1 .ClusterObjectSetTypeProgressing ,
644716 Status : metav1 .ConditionTrue ,
0 commit comments