Skip to content

Commit

Permalink
style: add retry when update STS/RS
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Sep 22, 2023
1 parent b50db7f commit 6520451
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 137 deletions.
125 changes: 61 additions & 64 deletions controllers/apps/v2beta1/add_emqx_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -27,11 +27,25 @@ type addCore struct {
}

func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ innerReq.RequesterInterface) subResult {
preSts, err := a.getNewStatefulSet(ctx, instance)
if err != nil {
return subResult{err: emperror.Wrap(err, "failed to get new statefulSet")}
logger := log.FromContext(ctx)
preSts := getNewStatefulSet(instance)
updateSts, _, _ := getStateFulSetList(ctx, a.Client, instance)

patchCalculateFunc := func(storage, new *appsv1.StatefulSet) *patch.PatchResult {
if storage == nil {
return &patch.PatchResult{Patch: []byte("{should create new StatefulSet}")}
}
patchResult, _ := a.Patcher.Calculate(
storage.DeepCopy(),
new.DeepCopy(),
justCheckPodTemplate(),
)
return patchResult
}
if preSts.UID == "" {
if patchResult := patchCalculateFunc(updateSts, preSts); !patchResult.IsEmpty() {
// Create new statefulSet
logger.Info("got different pod template for EMQX core nodes, will create new statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch))

_ = ctrl.SetControllerReference(instance, preSts, a.Scheme)
if err := a.Handler.Create(preSts); err != nil {
if k8sErrors.IsAlreadyExists(emperror.Cause(err)) {
Expand All @@ -44,32 +58,46 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}
return subResult{err: emperror.Wrap(err, "failed to create statefulSet")}
}
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewStatefulSet",
Message: "Create new statefulSet",
// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewStatefulSet",
Message: "Create new statefulSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady)
instance.Status.CoreNodesStatus.UpdateRevision = preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey]
return a.Client.Status().Update(ctx, instance)
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady)
instance.Status.CoreNodesStatus.UpdateRevision = preSts.Labels[appsv2beta1.LabelsPodTemplateHashKey]
_ = a.Client.Status().Update(ctx, instance)
} else {
storageSts := &appsv1.StatefulSet{}
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(preSts), storageSts)
patchResult, _ := a.Patcher.Calculate(storageSts, preSts,
patch.IgnoreStatusFields(),
patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
)
if !patchResult.IsEmpty() {
logger := log.FromContext(ctx)
logger.Info("got different statefulSet for EMQX core nodes, will update statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch))

if err := a.Handler.Update(preSts); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update statefulSet")}
}
return subResult{}
}

preSts.ObjectMeta = updateSts.DeepCopy().ObjectMeta
preSts.Spec.Template.ObjectMeta = updateSts.DeepCopy().Spec.Template.ObjectMeta
preSts.Spec.Selector = updateSts.DeepCopy().Spec.Selector
if patchResult, _ := a.Patcher.Calculate(
updateSts.DeepCopy(),
preSts.DeepCopy(),
patch.IgnoreStatusFields(),
patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
); !patchResult.IsEmpty() {
// Update statefulSet
logger.Info("got different statefulSet for EMQX core nodes, will update statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch))
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
storage := &appsv1.StatefulSet{}
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(preSts), storage)
preSts.ResourceVersion = storage.ResourceVersion
return a.Handler.Update(preSts)
}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update statefulSet")}
}
// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Expand All @@ -79,22 +107,13 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.CoreNodesReady)
_ = a.Client.Status().Update(ctx, instance)
}
return a.Client.Status().Update(ctx, instance)
})
}

return subResult{}
}

func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2beta1.EMQX) (*appsv1.StatefulSet, error) {
configMap := &corev1.ConfigMap{}
if err := a.Client.Get(ctx, types.NamespacedName{
Name: instance.ConfigsNamespacedName().Name,
Namespace: instance.Namespace,
}, configMap); err != nil {
return nil, emperror.Wrap(err, "failed to get configMap")
}

func getNewStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet {
var containerPort corev1.ContainerPort
if svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data); err != nil {
containerPort = corev1.ContainerPort{
Expand Down Expand Up @@ -126,29 +145,7 @@ func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2beta1.E
{Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND", Value: strconv.Itoa(int(containerPort.ContainerPort))},
}, preSts.Spec.Template.Spec.Containers[0].Env...)

updateSts, _, _ := getStateFulSetList(ctx, a.Client, instance)
if updateSts == nil {
return preSts, nil
}

patchResult, err := a.Patcher.Calculate(
updateSts.DeepCopy(),
preSts.DeepCopy(),
justCheckPodTemplate(),
)
if err != nil {
return nil, emperror.Wrap(err, "failed to calculate patch")
}
if patchResult.IsEmpty() {
preSts.ObjectMeta = updateSts.DeepCopy().ObjectMeta
preSts.Spec.Template.ObjectMeta = updateSts.DeepCopy().Spec.Template.ObjectMeta
preSts.Spec.Selector = updateSts.DeepCopy().Spec.Selector
return preSts, nil
}

logger := log.FromContext(ctx)
logger.Info("got different pod template for EMQX core nodes, will create new statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch))
return preSts, nil
return preSts
}

func generateStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet {
Expand Down
4 changes: 0 additions & 4 deletions controllers/apps/v2beta1/add_emqx_core_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
Expect(k8sClient.Create(context.TODO(), ns)).Should(Succeed())
})

It("create configMap", func() {
Expect((&syncConfig{emqxReconciler}).reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
})

It("should create statefulSet", func() {
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.StatefulSet {
Expand Down
128 changes: 63 additions & 65 deletions controllers/apps/v2beta1/add_emqx_repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -33,11 +33,26 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
return subResult{}
}

preRs, err := a.getNewReplicaSet(ctx, instance)
if err != nil {
return subResult{err: emperror.Wrap(err, "failed to get new replicaSet")}
logger := log.FromContext(ctx)
preRs := getNewReplicaSet(instance)
updateRs, _, _ := getReplicaSetList(ctx, a.Client, instance)

patchCalculateFunc := func(storage, new *appsv1.ReplicaSet) *patch.PatchResult {
if storage == nil {
return &patch.PatchResult{Patch: []byte("{should create new ReplicaSet}")}
}
patchResult, _ := a.Patcher.Calculate(
storage.DeepCopy(),
new.DeepCopy(),
justCheckPodTemplate(),
)
return patchResult
}
if preRs.UID == "" {

if patchResult := patchCalculateFunc(updateRs, preRs); !patchResult.IsEmpty() {
//Crete Rs
logger.Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch))

_ = ctrl.SetControllerReference(instance, preRs, a.Scheme)
if err := a.Handler.Create(preRs); err != nil {
if k8sErrors.IsAlreadyExists(emperror.Cause(err)) {
Expand All @@ -50,32 +65,10 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
}
return subResult{err: emperror.Wrap(err, "failed to create replicaSet")}
}
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewReplicaSet",
Message: "Create new replicaSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady)
instance.Status.ReplicantNodesStatus.UpdateRevision = preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey]
_ = a.Client.Status().Update(ctx, instance)
} else {
storageRs := &appsv1.ReplicaSet{}
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(preRs), storageRs)
patchResult, _ := a.Patcher.Calculate(storageRs, preRs,
patch.IgnoreStatusFields(),
patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
)
if !patchResult.IsEmpty() {
logger := log.FromContext(ctx)
logger.Info("got different replicaSet for EMQX replicant nodes, will update replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch))

if err := a.Handler.Update(preRs); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update replicaSet")}
}

// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Expand All @@ -85,22 +78,50 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady)
_ = a.Client.Status().Update(ctx, instance)
}
instance.Status.ReplicantNodesStatus.UpdateRevision = preRs.Labels[appsv2beta1.LabelsPodTemplateHashKey]
return a.Client.Status().Update(ctx, instance)
})
return subResult{}
}

preRs.ObjectMeta = updateRs.DeepCopy().ObjectMeta
preRs.Spec.Template.ObjectMeta = updateRs.DeepCopy().Spec.Template.ObjectMeta
preRs.Spec.Selector = updateRs.DeepCopy().Spec.Selector
if patchResult, _ := a.Patcher.Calculate(
updateRs.DeepCopy(),
preRs.DeepCopy(),
patch.IgnoreStatusFields(),
patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
); !patchResult.IsEmpty() {
// Update replicaSet
logger.Info("got different replicaSet for EMQX replicant nodes, will update replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch))
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
storage := &appsv1.ReplicaSet{}
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(preRs), storage)
preRs.ResourceVersion = storage.ResourceVersion
return a.Handler.Update(preRs)
}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update replicaSet")}
}
// Update EMQX status
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(instance), instance)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewReplicaSet",
Message: "Create new replicaSet",
})
instance.Status.RemoveCondition(appsv2beta1.Ready)
instance.Status.RemoveCondition(appsv2beta1.Available)
instance.Status.RemoveCondition(appsv2beta1.ReplicantNodesReady)
return a.Client.Status().Update(ctx, instance)
})
}
return subResult{}
}

func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2beta1.EMQX) (*appsv1.ReplicaSet, error) {
configMap := &corev1.ConfigMap{}
if err := a.Client.Get(ctx, types.NamespacedName{
Name: instance.ConfigsNamespacedName().Name,
Namespace: instance.Namespace,
}, configMap); err != nil {
return nil, emperror.Wrap(err, "failed to get configMap")
}

func getNewReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet {
var containerPort corev1.ContainerPort
if svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data); err != nil {
containerPort = corev1.ContainerPort{
Expand Down Expand Up @@ -132,30 +153,7 @@ func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2beta1.EM
{Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND", Value: strconv.Itoa(int(containerPort.ContainerPort))},
}, preRs.Spec.Template.Spec.Containers[0].Env...)

updateRs, _, _ := getReplicaSetList(ctx, a.Client, instance)
if updateRs == nil {
return preRs, nil
}

patchResult, err := a.Patcher.Calculate(
updateRs.DeepCopy(),
preRs.DeepCopy(),
justCheckPodTemplate(),
)
if err != nil {
return nil, emperror.Wrap(err, "failed to calculate patch result")
}
if patchResult.IsEmpty() {
preRs.ObjectMeta = updateRs.DeepCopy().ObjectMeta
preRs.Spec.Template.ObjectMeta = updateRs.DeepCopy().Spec.Template.ObjectMeta
preRs.Spec.Selector = updateRs.DeepCopy().Spec.Selector
return preRs, nil
}

logger := log.FromContext(ctx)
logger.Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch))

return preRs, nil
return preRs
}

func generateReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet {
Expand Down
4 changes: 0 additions & 4 deletions controllers/apps/v2beta1/add_emqx_repl_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
Expect(k8sClient.Create(context.TODO(), ns)).Should(Succeed())
})

It("create configMap", func() {
Expect((&syncConfig{emqxReconciler}).reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
})

Context("replicant template is nil", func() {
JustBeforeEach(func() {
instance.Spec.ReplicantTemplate = nil
Expand Down

0 comments on commit 6520451

Please sign in to comment.