diff --git a/controllers/apps/v2beta1/add_emqx_core.go b/controllers/apps/v2beta1/add_emqx_core.go index dbd9cf911..ba5ee7e1f 100644 --- a/controllers/apps/v2beta1/add_emqx_core.go +++ b/controllers/apps/v2beta1/add_emqx_core.go @@ -14,7 +14,7 @@ import ( corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" @@ -27,11 +27,25 @@ type addCore struct { } func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ innerReq.RequesterInterface) subResult { - preSts, err := a.getNewStatefulSet(ctx, instance) - if err != nil { - return subResult{err: emperror.Wrap(err, "failed to get new statefulSet")} + logger := log.FromContext(ctx) + preSts := getNewStatefulSet(instance) + updateSts, _, _ := getStateFulSetList(ctx, a.Client, instance) + + patchCalculateFunc := func(storage, new *appsv1.StatefulSet) *patch.PatchResult { + if storage == nil { + return &patch.PatchResult{Patch: []byte("{should create new StatefulSet}")} + } + patchResult, _ := a.Patcher.Calculate( + storage.DeepCopy(), + new.DeepCopy(), + justCheckPodTemplate(), + ) + return patchResult } - if preSts.UID == "" { + if patchResult := patchCalculateFunc(updateSts, preSts); !patchResult.IsEmpty() { + // Create new statefulSet + logger.Info("got different pod template for EMQX core nodes, will create new statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch)) + _ = ctrl.SetControllerReference(instance, preSts, a.Scheme) if err := a.Handler.Create(preSts); err != nil { if k8sErrors.IsAlreadyExists(emperror.Cause(err)) { @@ -44,32 +58,46 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i } return subResult{err: emperror.Wrap(err, "failed to create statefulSet")} } - instance.Status.SetCondition(metav1.Condition{ - Type: appsv2beta1.CoreNodesProgressing, - Status: metav1.ConditionTrue, - Reason: "CreateNewStatefulSet", - Message: "Create new statefulSet", + // Update EMQX status + _ = retry.RetryOnConflict(retry.DefaultRetry, func() error { + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) + instance.Status.SetCondition(metav1.Condition{ + Type: appsv2beta1.CoreNodesProgressing, + Status: metav1.ConditionTrue, + Reason: "CreateNewStatefulSet", + Message: "Create new statefulSet", + }) + instance.Status.RemoveCondition(appsv2beta1.Ready) + instance.Status.RemoveCondition(appsv2beta1.Available) + instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady) + instance.Status.CoreNodesStatus.UpdateRevision = preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey] + return a.Client.Status().Update(ctx, instance) }) - instance.Status.RemoveCondition(appsv2beta1.Ready) - instance.Status.RemoveCondition(appsv2beta1.Available) - instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady) - instance.Status.CoreNodesStatus.UpdateRevision = preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey] - _ = a.Client.Status().Update(ctx, instance) - } else { - storageSts := &appsv1.StatefulSet{} - _ = a.Client.Get(ctx, client.ObjectKeyFromObject(preSts), storageSts) - patchResult, _ := a.Patcher.Calculate(storageSts, preSts, - patch.IgnoreStatusFields(), - patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), - ) - if !patchResult.IsEmpty() { - logger := log.FromContext(ctx) - logger.Info("got different statefulSet for EMQX core nodes, will update statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch)) - - if err := a.Handler.Update(preSts); err != nil { - return subResult{err: emperror.Wrap(err, "failed to update statefulSet")} - } + return subResult{} + } + preSts.ObjectMeta = updateSts.DeepCopy().ObjectMeta + preSts.Spec.Template.ObjectMeta = updateSts.DeepCopy().Spec.Template.ObjectMeta + preSts.Spec.Selector = updateSts.DeepCopy().Spec.Selector + if patchResult, _ := a.Patcher.Calculate( + updateSts.DeepCopy(), + preSts.DeepCopy(), + patch.IgnoreStatusFields(), + patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), + ); !patchResult.IsEmpty() { + // Update statefulSet + logger.Info("got different statefulSet for EMQX core nodes, will update statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch)) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + storage := &appsv1.StatefulSet{} + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(preSts), storage) + preSts.ResourceVersion = storage.ResourceVersion + return a.Handler.Update(preSts) + }); err != nil { + return subResult{err: emperror.Wrap(err, "failed to update statefulSet")} + } + // Update EMQX status + _ = retry.RetryOnConflict(retry.DefaultRetry, func() error { + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) instance.Status.SetCondition(metav1.Condition{ Type: appsv2beta1.CoreNodesProgressing, Status: metav1.ConditionTrue, @@ -79,22 +107,13 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i instance.Status.RemoveCondition(appsv2beta1.Ready) instance.Status.RemoveCondition(appsv2beta1.Available) instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady) - _ = a.Client.Status().Update(ctx, instance) - } + return a.Client.Status().Update(ctx, instance) + }) } - return subResult{} } -func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2beta1.EMQX) (*appsv1.StatefulSet, error) { - configMap := &corev1.ConfigMap{} - if err := a.Client.Get(ctx, types.NamespacedName{ - Name: instance.ConfigsNamespacedName().Name, - Namespace: instance.Namespace, - }, configMap); err != nil { - return nil, emperror.Wrap(err, "failed to get configMap") - } - +func getNewStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet { var containerPort corev1.ContainerPort if svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data); err != nil { containerPort = corev1.ContainerPort{ @@ -126,29 +145,7 @@ func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2beta1.E {Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND", Value: strconv.Itoa(int(containerPort.ContainerPort))}, }, preSts.Spec.Template.Spec.Containers[0].Env...) - updateSts, _, _ := getStateFulSetList(ctx, a.Client, instance) - if updateSts == nil { - return preSts, nil - } - - patchResult, err := a.Patcher.Calculate( - updateSts.DeepCopy(), - preSts.DeepCopy(), - justCheckPodTemplate(), - ) - if err != nil { - return nil, emperror.Wrap(err, "failed to calculate patch") - } - if patchResult.IsEmpty() { - preSts.ObjectMeta = updateSts.DeepCopy().ObjectMeta - preSts.Spec.Template.ObjectMeta = updateSts.DeepCopy().Spec.Template.ObjectMeta - preSts.Spec.Selector = updateSts.DeepCopy().Spec.Selector - return preSts, nil - } - - logger := log.FromContext(ctx) - logger.Info("got different pod template for EMQX core nodes, will create new statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch)) - return preSts, nil + return preSts } func generateStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet { diff --git a/controllers/apps/v2beta1/add_emqx_core_suite_test.go b/controllers/apps/v2beta1/add_emqx_core_suite_test.go index 28243b2a6..1e4c6016b 100644 --- a/controllers/apps/v2beta1/add_emqx_core_suite_test.go +++ b/controllers/apps/v2beta1/add_emqx_core_suite_test.go @@ -53,10 +53,6 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() { Expect(k8sClient.Create(context.TODO(), ns)).Should(Succeed()) }) - It("create configMap", func() { - Expect((&syncConfig{emqxReconciler}).reconcile(ctx, instance, nil)).Should(Equal(subResult{})) - }) - It("should create statefulSet", func() { Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{})) Eventually(func() []appsv1.StatefulSet { diff --git a/controllers/apps/v2beta1/add_emqx_repl.go b/controllers/apps/v2beta1/add_emqx_repl.go index 6de376860..445eb0e92 100644 --- a/controllers/apps/v2beta1/add_emqx_repl.go +++ b/controllers/apps/v2beta1/add_emqx_repl.go @@ -13,7 +13,7 @@ import ( corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" @@ -33,11 +33,26 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i return subResult{} } - preRs, err := a.getNewReplicaSet(ctx, instance) - if err != nil { - return subResult{err: emperror.Wrap(err, "failed to get new replicaSet")} + logger := log.FromContext(ctx) + preRs := getNewReplicaSet(instance) + updateRs, _, _ := getReplicaSetList(ctx, a.Client, instance) + + patchCalculateFunc := func(storage, new *appsv1.ReplicaSet) *patch.PatchResult { + if storage == nil { + return &patch.PatchResult{Patch: []byte("{should create new ReplicaSet}")} + } + patchResult, _ := a.Patcher.Calculate( + storage.DeepCopy(), + new.DeepCopy(), + justCheckPodTemplate(), + ) + return patchResult } - if preRs.UID == "" { + + if patchResult := patchCalculateFunc(updateRs, preRs); !patchResult.IsEmpty() { + //Crete Rs + logger.Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch)) + _ = ctrl.SetControllerReference(instance, preRs, a.Scheme) if err := a.Handler.Create(preRs); err != nil { if k8sErrors.IsAlreadyExists(emperror.Cause(err)) { @@ -50,32 +65,10 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i } return subResult{err: emperror.Wrap(err, "failed to create replicaSet")} } - instance.Status.SetCondition(metav1.Condition{ - Type: appsv2beta1.ReplicantNodesProgressing, - Status: metav1.ConditionTrue, - Reason: "CreateNewReplicaSet", - Message: "Create new replicaSet", - }) - instance.Status.RemoveCondition(appsv2beta1.Ready) - instance.Status.RemoveCondition(appsv2beta1.Available) - instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady) - instance.Status.ReplicantNodesStatus.UpdateRevision = preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey] - _ = a.Client.Status().Update(ctx, instance) - } else { - storageRs := &appsv1.ReplicaSet{} - _ = a.Client.Get(ctx, client.ObjectKeyFromObject(preRs), storageRs) - patchResult, _ := a.Patcher.Calculate(storageRs, preRs, - patch.IgnoreStatusFields(), - patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), - ) - if !patchResult.IsEmpty() { - logger := log.FromContext(ctx) - logger.Info("got different replicaSet for EMQX replicant nodes, will update replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch)) - - if err := a.Handler.Update(preRs); err != nil { - return subResult{err: emperror.Wrap(err, "failed to update replicaSet")} - } + // Update EMQX status + _ = retry.RetryOnConflict(retry.DefaultRetry, func() error { + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) instance.Status.SetCondition(metav1.Condition{ Type: appsv2beta1.ReplicantNodesProgressing, Status: metav1.ConditionTrue, @@ -85,22 +78,50 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i instance.Status.RemoveCondition(appsv2beta1.Ready) instance.Status.RemoveCondition(appsv2beta1.Available) instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady) - _ = a.Client.Status().Update(ctx, instance) - } + instance.Status.ReplicantNodesStatus.UpdateRevision = preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey] + return a.Client.Status().Update(ctx, instance) + }) + return subResult{} } + preRs.ObjectMeta = updateRs.DeepCopy().ObjectMeta + preRs.Spec.Template.ObjectMeta = updateRs.DeepCopy().Spec.Template.ObjectMeta + preRs.Spec.Selector = updateRs.DeepCopy().Spec.Selector + if patchResult, _ := a.Patcher.Calculate( + updateRs.DeepCopy(), + preRs.DeepCopy(), + patch.IgnoreStatusFields(), + patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(), + ); !patchResult.IsEmpty() { + // Update replicaSet + logger.Info("got different replicaSet for EMQX replicant nodes, will update replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch)) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + storage := &appsv1.ReplicaSet{} + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(preRs), storage) + preRs.ResourceVersion = storage.ResourceVersion + return a.Handler.Update(preRs) + }); err != nil { + return subResult{err: emperror.Wrap(err, "failed to update replicaSet")} + } + // Update EMQX status + _ = retry.RetryOnConflict(retry.DefaultRetry, func() error { + _ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance) + instance.Status.SetCondition(metav1.Condition{ + Type: appsv2beta1.ReplicantNodesProgressing, + Status: metav1.ConditionTrue, + Reason: "CreateNewReplicaSet", + Message: "Create new replicaSet", + }) + instance.Status.RemoveCondition(appsv2beta1.Ready) + instance.Status.RemoveCondition(appsv2beta1.Available) + instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady) + return a.Client.Status().Update(ctx, instance) + }) + } return subResult{} } -func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2beta1.EMQX) (*appsv1.ReplicaSet, error) { - configMap := &corev1.ConfigMap{} - if err := a.Client.Get(ctx, types.NamespacedName{ - Name: instance.ConfigsNamespacedName().Name, - Namespace: instance.Namespace, - }, configMap); err != nil { - return nil, emperror.Wrap(err, "failed to get configMap") - } - +func getNewReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet { var containerPort corev1.ContainerPort if svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data); err != nil { containerPort = corev1.ContainerPort{ @@ -132,30 +153,7 @@ func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2beta1.EM {Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND", Value: strconv.Itoa(int(containerPort.ContainerPort))}, }, preRs.Spec.Template.Spec.Containers[0].Env...) - updateRs, _, _ := getReplicaSetList(ctx, a.Client, instance) - if updateRs == nil { - return preRs, nil - } - - patchResult, err := a.Patcher.Calculate( - updateRs.DeepCopy(), - preRs.DeepCopy(), - justCheckPodTemplate(), - ) - if err != nil { - return nil, emperror.Wrap(err, "failed to calculate patch result") - } - if patchResult.IsEmpty() { - preRs.ObjectMeta = updateRs.DeepCopy().ObjectMeta - preRs.Spec.Template.ObjectMeta = updateRs.DeepCopy().Spec.Template.ObjectMeta - preRs.Spec.Selector = updateRs.DeepCopy().Spec.Selector - return preRs, nil - } - - logger := log.FromContext(ctx) - logger.Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch)) - - return preRs, nil + return preRs } func generateReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet { diff --git a/controllers/apps/v2beta1/add_emqx_repl_suite_test.go b/controllers/apps/v2beta1/add_emqx_repl_suite_test.go index 7735ba1bc..990842fd3 100644 --- a/controllers/apps/v2beta1/add_emqx_repl_suite_test.go +++ b/controllers/apps/v2beta1/add_emqx_repl_suite_test.go @@ -62,10 +62,6 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() { Expect(k8sClient.Create(context.TODO(), ns)).Should(Succeed()) }) - It("create configMap", func() { - Expect((&syncConfig{emqxReconciler}).reconcile(ctx, instance, nil)).Should(Equal(subResult{})) - }) - Context("replicant template is nil", func() { JustBeforeEach(func() { instance.Spec.ReplicantTemplate = nil