From 70a0e83c66d36f201632429bd505f8e99109ba4f Mon Sep 17 00:00:00 2001 From: Rory Z <16801068+Rory-Z@users.noreply.github.com> Date: Fri, 22 Sep 2023 15:21:11 +0800 Subject: [PATCH] style: add retry when update STS/RS Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com> --- controllers/apps/v2beta1/add_emqx_core.go | 128 +++++++++++----------- controllers/apps/v2beta1/add_emqx_repl.go | 128 +++++++++++----------- 2 files changed, 129 insertions(+), 127 deletions(-) diff --git a/controllers/apps/v2beta1/add_emqx_core.go b/controllers/apps/v2beta1/add_emqx_core.go index dbd9cf911..5c519fbef 100644 --- a/controllers/apps/v2beta1/add_emqx_core.go +++ b/controllers/apps/v2beta1/add_emqx_core.go @@ -14,7 +14,7 @@ import ( corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" @@ -27,11 +27,25 @@ type addCore struct { } func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ innerReq.RequesterInterface) subResult { - preSts, err := a.getNewStatefulSet(ctx, instance) - if err != nil { - return subResult{err: emperror.Wrap(err, "failed to get new statefulSet")} + logger := log.FromContext(ctx) + preSts := getNewStatefulSet(instance) + updateSts, _, _ := getStateFulSetList(ctx, a.Client, instance) + + patchCalculateFunc := func(storage, new *appsv1.StatefulSet) *patch.PatchResult { + if storage == nil { + return &patch.PatchResult{Patch: []byte("{should create new StatefulSet}")} + } + patchResult, _ := a.Patcher.Calculate( + storage.DeepCopy(), + new.DeepCopy(), + justCheckPodTemplate(), + ) + return patchResult } - if preSts.UID == "" { + if patchResult := patchCalculateFunc(updateSts, preSts); !patchResult.IsEmpty() { + // Create new statefulSet + logger.Info("got different pod template for EMQX core nodes, will create new statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch)) + _ = ctrl.SetControllerReference(instance, preSts, a.Scheme) if err := a.Handler.Create(preSts); err != nil { if k8sErrors.IsAlreadyExists(emperror.Cause(err)) { @@ -44,32 +58,9 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i } return subResult{err: emperror.Wrap(err, "failed to create statefulSet")} } - instance.Status.SetCondition(metav1.Condition{ - Type: appsv2beta1.CoreNodesProgressing, - Status: metav1.ConditionTrue, - Reason: "CreateNewStatefulSet", - Message: "Create new statefulSet", - }) - instance.Status.RemoveCondition(appsv2beta1.Ready) - instance.Status.RemoveCondition(appsv2beta1.Available) - instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady) - instance.Status.CoreNodesStatus.UpdateRevision = preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey] - _ = a.Client.Status().Update(ctx, instance) - } else { - storageSts := &appsv1.StatefulSet{} - _ = a.Client.Get(ctx, client.ObjectKeyFromObject(preSts), storageSts) - patchResult, _ := a.Patcher.Calculate(storageSts, preSts, - patch.IgnoreStatusFields(), - patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), - ) - if !patchResult.IsEmpty() { - logger := log.FromContext(ctx) - logger.Info("got different statefulSet for EMQX core nodes, will update statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch)) - - if err := a.Handler.Update(preSts); err != nil { - return subResult{err: emperror.Wrap(err, "failed to update statefulSet")} - } - + // Update EMQX status + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) instance.Status.SetCondition(metav1.Condition{ Type: appsv2beta1.CoreNodesProgressing, Status: metav1.ConditionTrue, @@ -79,22 +70,53 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i instance.Status.RemoveCondition(appsv2beta1.Ready) instance.Status.RemoveCondition(appsv2beta1.Available) instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady) - _ = a.Client.Status().Update(ctx, instance) + instance.Status.CoreNodesStatus.UpdateRevision = preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey] + return a.Client.Status().Update(ctx, instance) + }); err != nil { + return subResult{err: emperror.Wrap(err, "failed to update EMQX status")} } } + preSts.ObjectMeta = updateSts.DeepCopy().ObjectMeta + preSts.Spec.Template.ObjectMeta = updateSts.DeepCopy().Spec.Template.ObjectMeta + preSts.Spec.Selector = updateSts.DeepCopy().Spec.Selector + if patchResult, _ := a.Patcher.Calculate( + updateSts.DeepCopy(), + preSts.DeepCopy(), + patch.IgnoreStatusFields(), + patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), + ); !patchResult.IsEmpty() { + // Update statefulSet + logger.Info("got different statefulSet for EMQX core nodes, will update statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch)) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + storage := &appsv1.StatefulSet{} + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(preSts), storage) + preSts.ResourceVersion = storage.ResourceVersion + return a.Handler.Update(preSts) + }); err != nil { + return subResult{err: emperror.Wrap(err, "failed to update statefulSet")} + } + // Update EMQX status + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) + instance.Status.SetCondition(metav1.Condition{ + Type: appsv2beta1.CoreNodesProgressing, + Status: metav1.ConditionTrue, + Reason: "CreateNewStatefulSet", + Message: "Create new statefulSet", + }) + instance.Status.RemoveCondition(appsv2beta1.Ready) + instance.Status.RemoveCondition(appsv2beta1.Available) + instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady) + return a.Client.Status().Update(ctx, instance) + }); err != nil { + return subResult{err: emperror.Wrap(err, "failed to update EMQX status")} + } + } return subResult{} } -func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2beta1.EMQX) (*appsv1.StatefulSet, error) { - configMap := &corev1.ConfigMap{} - if err := a.Client.Get(ctx, types.NamespacedName{ - Name: instance.ConfigsNamespacedName().Name, - Namespace: instance.Namespace, - }, configMap); err != nil { - return nil, emperror.Wrap(err, "failed to get configMap") - } - +func getNewStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet { var containerPort corev1.ContainerPort if svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data); err != nil { containerPort = corev1.ContainerPort{ @@ -126,29 +148,7 @@ func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2beta1.E {Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND", Value: strconv.Itoa(int(containerPort.ContainerPort))}, }, preSts.Spec.Template.Spec.Containers[0].Env...) - updateSts, _, _ := getStateFulSetList(ctx, a.Client, instance) - if updateSts == nil { - return preSts, nil - } - - patchResult, err := a.Patcher.Calculate( - updateSts.DeepCopy(), - preSts.DeepCopy(), - justCheckPodTemplate(), - ) - if err != nil { - return nil, emperror.Wrap(err, "failed to calculate patch") - } - if patchResult.IsEmpty() { - preSts.ObjectMeta = updateSts.DeepCopy().ObjectMeta - preSts.Spec.Template.ObjectMeta = updateSts.DeepCopy().Spec.Template.ObjectMeta - preSts.Spec.Selector = updateSts.DeepCopy().Spec.Selector - return preSts, nil - } - - logger := log.FromContext(ctx) - logger.Info("got different pod template for EMQX core nodes, will create new statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch)) - return preSts, nil + return preSts } func generateStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet { diff --git a/controllers/apps/v2beta1/add_emqx_repl.go b/controllers/apps/v2beta1/add_emqx_repl.go index 6de376860..66043762b 100644 --- a/controllers/apps/v2beta1/add_emqx_repl.go +++ b/controllers/apps/v2beta1/add_emqx_repl.go @@ -13,7 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" @@ -33,11 +33,26 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i return subResult{} } - preRs, err := a.getNewReplicaSet(ctx, instance) - if err != nil { - return subResult{err: emperror.Wrap(err, "failed to get new replicaSet")} + logger := log.FromContext(ctx) + preRs := getNewReplicaSet(instance) + updateRs, _, _ := getReplicaSetList(ctx, a.Client, instance) + + patchCalculateFunc := func(storage, new *appsv1.ReplicaSet) *patch.PatchResult { + if storage == nil { + return &patch.PatchResult{Patch: []byte("{should create new ReplicaSet}")} + } + patchResult, _ := a.Patcher.Calculate( + storage.DeepCopy(), + new.DeepCopy(), + justCheckPodTemplate(), + ) + return patchResult } - if preRs.UID == "" { + + if patchResult := patchCalculateFunc(updateRs, preRs); !patchResult.IsEmpty() { + //Crete Rs + logger.Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch)) + _ = ctrl.SetControllerReference(instance, preRs, a.Scheme) if err := a.Handler.Create(preRs); err != nil { if k8sErrors.IsAlreadyExists(emperror.Cause(err)) { @@ -50,32 +65,48 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i } return subResult{err: emperror.Wrap(err, "failed to create replicaSet")} } - instance.Status.SetCondition(metav1.Condition{ - Type: appsv2beta1.ReplicantNodesProgressing, - Status: metav1.ConditionTrue, - Reason: "CreateNewReplicaSet", - Message: "Create new replicaSet", - }) - instance.Status.RemoveCondition(appsv2beta1.Ready) - instance.Status.RemoveCondition(appsv2beta1.Available) - instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady) - instance.Status.ReplicantNodesStatus.UpdateRevision = preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey] - _ = a.Client.Status().Update(ctx, instance) - } else { - storageRs := &appsv1.ReplicaSet{} - _ = a.Client.Get(ctx, client.ObjectKeyFromObject(preRs), storageRs) - patchResult, _ := a.Patcher.Calculate(storageRs, preRs, - patch.IgnoreStatusFields(), - patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), - ) - if !patchResult.IsEmpty() { - logger := log.FromContext(ctx) - logger.Info("got different replicaSet for EMQX replicant nodes, will update replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch)) - if err := a.Handler.Update(preRs); err != nil { - return subResult{err: emperror.Wrap(err, "failed to update replicaSet")} - } + // Update EMQX status + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) + instance.Status.SetCondition(metav1.Condition{ + Type: appsv2beta1.ReplicantNodesProgressing, + Status: metav1.ConditionTrue, + Reason: "CreateNewReplicaSet", + Message: "Create new replicaSet", + }) + instance.Status.RemoveCondition(appsv2beta1.Ready) + instance.Status.RemoveCondition(appsv2beta1.Available) + instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady) + instance.Status.ReplicantNodesStatus.UpdateRevision = preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey] + return a.Client.Status().Update(ctx, instance) + }); err != nil { + return subResult{err: emperror.Wrap(err, "failed to update EMQX status")} + } + } + preRs.ObjectMeta = updateRs.DeepCopy().ObjectMeta + preRs.Spec.Template.ObjectMeta = updateRs.DeepCopy().Spec.Template.ObjectMeta + preRs.Spec.Selector = updateRs.DeepCopy().Spec.Selector + if patchResult, _ := a.Patcher.Calculate( + updateRs.DeepCopy(), + preRs.DeepCopy(), + patch.IgnoreStatusFields(), + patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), + ); !patchResult.IsEmpty() { + // Update replicaSet + logger.Info("got different replicaSet for EMQX replicant nodes, will update replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch)) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + storage := &appsv1.ReplicaSet{} + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(preRs), storage) + preRs.ResourceVersion = storage.ResourceVersion + return a.Handler.Update(preRs) + }); err != nil { + return subResult{err: emperror.Wrap(err, "failed to update replicaSet")} + } + // Update EMQX status + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) instance.Status.SetCondition(metav1.Condition{ Type: appsv2beta1.ReplicantNodesProgressing, Status: metav1.ConditionTrue, @@ -85,22 +116,16 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i instance.Status.RemoveCondition(appsv2beta1.Ready) instance.Status.RemoveCondition(appsv2beta1.Available) instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady) - _ = a.Client.Status().Update(ctx, instance) + return a.Client.Status().Update(ctx, instance) + }); err != nil { + return subResult{err: emperror.Wrap(err, "failed to update EMQX status")} } } return subResult{} } -func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2beta1.EMQX) (*appsv1.ReplicaSet, error) { - configMap := &corev1.ConfigMap{} - if err := a.Client.Get(ctx, types.NamespacedName{ - Name: instance.ConfigsNamespacedName().Name, - Namespace: instance.Namespace, - }, configMap); err != nil { - return nil, emperror.Wrap(err, "failed to get configMap") - } - +func getNewReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet { var containerPort corev1.ContainerPort if svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data); err != nil { containerPort = corev1.ContainerPort{ @@ -132,30 +157,7 @@ func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2beta1.EM {Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND", Value: strconv.Itoa(int(containerPort.ContainerPort))}, }, preRs.Spec.Template.Spec.Containers[0].Env...) - updateRs, _, _ := getReplicaSetList(ctx, a.Client, instance) - if updateRs == nil { - return preRs, nil - } - - patchResult, err := a.Patcher.Calculate( - updateRs.DeepCopy(), - preRs.DeepCopy(), - justCheckPodTemplate(), - ) - if err != nil { - return nil, emperror.Wrap(err, "failed to calculate patch result") - } - if patchResult.IsEmpty() { - preRs.ObjectMeta = updateRs.DeepCopy().ObjectMeta - preRs.Spec.Template.ObjectMeta = updateRs.DeepCopy().Spec.Template.ObjectMeta - preRs.Spec.Selector = updateRs.DeepCopy().Spec.Selector - return preRs, nil - } - - logger := log.FromContext(ctx) - logger.Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch)) - - return preRs, nil + return preRs } func generateReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet {