Skip to content

Commit

Permalink
feat(v2alpha2): when delete pod, evaluation mqtt session
Browse files Browse the repository at this point in the history
  • Loading branch information
Rory-Z committed Jun 29, 2023
1 parent c636939 commit efd57aa
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 146 deletions.
101 changes: 56 additions & 45 deletions controllers/apps/v2alpha2/add_emqx_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package v2alpha2

import (
"context"
"fmt"
"reflect"
"strings"

emperror "emperror.dev/errors"
appsv2alpha2 "github.com/emqx/emqx-operator/apis/apps/v2alpha2"
Expand All @@ -11,9 +13,9 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -47,7 +49,7 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _
_ = a.Client.Status().Update(ctx, instance)
}

if err := a.syncStatefulSet(ctx, instance); err != nil {
if err := a.sync(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to sync replicaSet")}
}

Expand All @@ -56,71 +58,80 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _

func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2alpha2.EMQX) *appsv1.StatefulSet {
preSts := generateStatefulSet(instance)

list := &appsv1.StatefulSetList{}
_ = a.Client.List(ctx, list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(appsv2alpha2.CloneAndAddLabel(
instance.Spec.CoreTemplate.Labels,
appsv2alpha2.PodTemplateHashLabelKey,
instance.Status.CoreNodesStatus.CurrentRevision,
)),
)
if len(list.Items) > 0 {
sts := list.Items[0].DeepCopy()
patchResult, _ := a.Patcher.Calculate(
sts,
preSts.DeepCopy(),
justCheckPodTemplate(),
)
if patchResult.IsEmpty() {
preSts.ObjectMeta = sts.ObjectMeta
preSts.Spec.Template.ObjectMeta = sts.Spec.Template.ObjectMeta
preSts.Spec.Selector = sts.Spec.Selector
return preSts
}
logger := log.FromContext(ctx)
logger.V(1).Info("got different pod template for EMQX core nodes, will create new statefulSet", "patch", string(patchResult.Patch))
}

podTemplateSpecHash := computeHash(preSts.Spec.Template.DeepCopy(), instance.Status.CoreNodesStatus.CollisionCount)
preSts.Name = preSts.Name + "-" + podTemplateSpecHash
preSts.Labels = appsv2alpha2.CloneAndAddLabel(preSts.Labels, appsv2alpha2.PodTemplateHashLabelKey, podTemplateSpecHash)
preSts.Spec.Template.Labels = appsv2alpha2.CloneAndAddLabel(preSts.Spec.Template.Labels, appsv2alpha2.PodTemplateHashLabelKey, podTemplateSpecHash)
preSts.Spec.Selector = appsv2alpha2.CloneSelectorAndAddLabel(preSts.Spec.Selector, appsv2alpha2.PodTemplateHashLabelKey, podTemplateSpecHash)

currentSts, _ := getStateFulSetList(ctx, a.Client, instance)
if currentSts == nil {
return preSts
}

patchResult, _ := a.Patcher.Calculate(
currentSts.DeepCopy(),
preSts.DeepCopy(),
justCheckPodTemplate(),
)
if patchResult.IsEmpty() {
preSts.ObjectMeta = currentSts.ObjectMeta
preSts.Spec.Template.ObjectMeta = currentSts.Spec.Template.ObjectMeta
preSts.Spec.Selector = currentSts.Spec.Selector
return preSts
}

logger := log.FromContext(ctx)
logger.V(1).Info("got different pod template for EMQX core nodes, will create new statefulSet", "patch", string(patchResult.Patch))
return preSts
}

func (a *addCore) syncStatefulSet(ctx context.Context, instance *appsv2alpha2.EMQX) error {
func (a *addCore) sync(ctx context.Context, instance *appsv2alpha2.EMQX) error {
if isExistReplicant(instance) {
rsList := getReplicaSetList(ctx, a.Client,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Spec.ReplicantTemplate.Labels),
)
if len(rsList) != 1 {
_, oldRsList := getReplicaSetList(ctx, a.Client, instance)
if len(oldRsList) != 0 {
// wait for replicaSet finished the scale down
return nil
}
}

stsList := getStateFulSetList(ctx, a.Client,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Spec.CoreTemplate.Labels),
)
if len(stsList) <= 1 {
_, oldStsList := getStateFulSetList(ctx, a.Client, instance)
if len(oldStsList) == 0 {
return nil
}

old := stsList[0].DeepCopy()
eList := getEventList(ctx, a.Clientset, old)
oldest := oldStsList[0].DeepCopy()

if canBeScaledDown(instance, appsv2alpha2.Ready, eList) {
old.Spec.Replicas = pointer.Int32Ptr(old.Status.Replicas - 1)
if err := a.Client.Update(ctx, old); err != nil {
if a.findCanBeDeletePod(ctx, instance, oldest) != nil {
oldest.Spec.Replicas = pointer.Int32Ptr(oldest.Status.Replicas - 1)
if err := a.Client.Update(ctx, oldest); err != nil {
return emperror.Wrap(err, "failed to scale down old replicaSet")
}
return nil
}

return nil
}

func (a *addCore) findCanBeDeletePod(ctx context.Context, instance *appsv2alpha2.EMQX, old *appsv1.StatefulSet) *corev1.Pod {
if !canBeScaledDown(instance, appsv2alpha2.Ready, getEventList(ctx, a.Clientset, old)) {
return nil
}
pod := &corev1.Pod{}
_ = a.Client.Get(ctx, types.NamespacedName{
Namespace: instance.Namespace,
Name: fmt.Sprintf("%s-%d", old.Name, old.Status.Replicas-1),
}, pod)

for _, node := range instance.Status.CoreNodesStatus.Nodes {
host := strings.Split(node.Node[strings.Index(node.Node, "@")+1:], ":")[0]
if strings.HasPrefix(host, pod.Name) {
if node.Edition == "Enterprise" && node.Session != 0 {
return nil
}
return pod
}
}
return nil
}

Expand Down
49 changes: 28 additions & 21 deletions controllers/apps/v2alpha2/add_emqx_core_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package v2alpha2

import (
"context"
"errors"
"fmt"
"sort"
"time"

appsv2alpha2 "github.com/emqx/emqx-operator/apis/apps/v2alpha2"
Expand Down Expand Up @@ -38,7 +39,18 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {

instance = emqx.DeepCopy()
instance.Namespace = ns.Name

instance.Status.Conditions = []metav1.Condition{
{
Type: appsv2alpha2.Ready,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: time.Now().AddDate(0, 0, -1)},
},
{
Type: appsv2alpha2.CodeNodesReady,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: time.Now().AddDate(0, 0, -1)},
},
}
})

It("create namespace", func() {
Expand Down Expand Up @@ -115,33 +127,28 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
var old, new *appsv1.StatefulSet = new(appsv1.StatefulSet), new(appsv1.StatefulSet)

JustBeforeEach(func() {
Eventually(func() error {
list := getStateFulSetList(ctx, a.Client,
list := &appsv1.StatefulSetList{}
Eventually(func() []appsv1.StatefulSet {
_ = k8sClient.List(ctx, list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Spec.CoreTemplate.Labels),
)
if len(list) == 0 {
return errors.New("not found")
}
old = list[0].DeepCopy()
new = list[len(list)-1].DeepCopy()
return nil
}).Should(Succeed())
Expect(old.UID).ShouldNot(Equal(new.UID))
sort.Slice(list.Items, func(i, j int) bool {
return list.Items[i].CreationTimestamp.Before(&list.Items[j].CreationTimestamp)
})
return list.Items
}).Should(HaveLen(2))
old = list.Items[0].DeepCopy()
new = list.Items[1].DeepCopy()

//Sync the "change image" test case.
instance.Spec.Image = new.Spec.Template.Spec.Containers[0].Image
instance.Status.CoreNodesStatus.CurrentRevision = new.Labels[appsv2alpha2.PodTemplateHashLabelKey]
instance.Status.Conditions = []metav1.Condition{
{
Type: appsv2alpha2.Ready,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: time.Now().AddDate(0, 0, -1)},
},
instance.Status.CoreNodesStatus.Nodes = []appsv2alpha2.EMQXNode{
{
Type: appsv2alpha2.CodeNodesReady,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: time.Now().AddDate(0, 0, -1)},
Node: fmt.Sprintf("emqx@%s.fake", fmt.Sprintf("%s-%d", new.Name, *new.Spec.Replicas)),
Edition: "Enterprise",
Session: 0,
},
}

Expand Down
116 changes: 77 additions & 39 deletions controllers/apps/v2alpha2/add_emqx_repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package v2alpha2

import (
"context"
"sort"
"strings"

emperror "emperror.dev/errors"
appsv2alpha2 "github.com/emqx/emqx-operator/apis/apps/v2alpha2"
Expand Down Expand Up @@ -54,7 +56,7 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _
_ = a.Client.Status().Update(ctx, instance)
}

if err := a.syncReplicaSet(ctx, instance); err != nil {
if err := a.sync(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to sync replicaSet")}
}

Expand All @@ -63,64 +65,100 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _

func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2alpha2.EMQX) *appsv1.ReplicaSet {
preRs := generateReplicaSet(instance)

list := &appsv1.ReplicaSetList{}
_ = a.Client.List(context.TODO(), list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(appsv2alpha2.CloneAndAddLabel(
instance.Spec.ReplicantTemplate.Labels,
appsv2alpha2.PodTemplateHashLabelKey,
instance.Status.ReplicantNodesStatus.CurrentRevision,
)),
)
if len(list.Items) > 0 {
rs := list.Items[0].DeepCopy()
patchResult, _ := a.Patcher.Calculate(
rs,
preRs.DeepCopy(),
justCheckPodTemplate(),
)
if patchResult.IsEmpty() {
preRs.ObjectMeta = rs.ObjectMeta
preRs.Spec.Template.ObjectMeta = rs.Spec.Template.ObjectMeta
preRs.Spec.Selector = rs.Spec.Selector
return preRs
}
logger := log.FromContext(ctx)
logger.V(1).Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "patch", string(patchResult.Patch))
}

podTemplateSpecHash := computeHash(preRs.Spec.Template.DeepCopy(), instance.Status.ReplicantNodesStatus.CollisionCount)
preRs.Name = preRs.Name + "-" + podTemplateSpecHash
preRs.Labels = appsv2alpha2.CloneAndAddLabel(preRs.Labels, appsv2alpha2.PodTemplateHashLabelKey, podTemplateSpecHash)
preRs.Spec.Template.Labels = appsv2alpha2.CloneAndAddLabel(preRs.Spec.Template.Labels, appsv2alpha2.PodTemplateHashLabelKey, podTemplateSpecHash)
preRs.Spec.Selector = appsv2alpha2.CloneSelectorAndAddLabel(preRs.Spec.Selector, appsv2alpha2.PodTemplateHashLabelKey, podTemplateSpecHash)

currentRs, _ := getReplicaSetList(ctx, a.Client, instance)
if currentRs == nil {
return preRs
}

patchResult, _ := a.Patcher.Calculate(
currentRs.DeepCopy(),
preRs.DeepCopy(),
justCheckPodTemplate(),
)
if patchResult.IsEmpty() {
preRs.ObjectMeta = currentRs.ObjectMeta
preRs.Spec.Template.ObjectMeta = currentRs.Spec.Template.ObjectMeta
preRs.Spec.Selector = currentRs.Spec.Selector
return preRs
}
logger := log.FromContext(ctx)
logger.V(1).Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "patch", string(patchResult.Patch))

return preRs
}

func (a *addRepl) syncReplicaSet(ctx context.Context, instance *appsv2alpha2.EMQX) error {
rsList := getReplicaSetList(ctx, a.Client,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Spec.ReplicantTemplate.Labels),
)
if len(rsList) <= 1 {
func (a *addRepl) sync(ctx context.Context, instance *appsv2alpha2.EMQX) error {
_, oldRsList := getReplicaSetList(ctx, a.Client, instance)
if len(oldRsList) == 0 {
return nil
}

old := rsList[0].DeepCopy()
eList := getEventList(ctx, a.Clientset, old)
oldest := oldRsList[0].DeepCopy()

if canBeScaledDown(instance, appsv2alpha2.Ready, eList) {
old.Spec.Replicas = pointer.Int32(old.Status.Replicas - 1)
if err := a.Client.Update(ctx, old); err != nil {
if pod := a.findCanBeDeletePod(ctx, instance, oldest); pod != nil {
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
// https://kubernetes.io/docs/concepts/workloads/controllers/replicaset/#pod-deletion-cost
pod.Annotations["controller.kubernetes.io/pod-deletion-cost"] = "-99999"
if err := a.Client.Patch(ctx, pod, client.MergeFrom(pod)); err != nil {
return emperror.Wrap(err, "failed patch pod deletion cost")
}

oldest.Spec.Replicas = pointer.Int32(oldest.Status.Replicas - 1)
if err := a.Client.Update(ctx, oldest); err != nil {
return emperror.Wrap(err, "failed to scale down old replicaSet")
}
return nil
}
return nil
}

func (a *addRepl) findCanBeDeletePod(ctx context.Context, instance *appsv2alpha2.EMQX, old *appsv1.ReplicaSet) *corev1.Pod {
if !canBeScaledDown(instance, appsv2alpha2.Ready, getEventList(ctx, a.Clientset, old)) {
return nil
}

type podSessionCount struct {
pod *corev1.Pod
edition string
session int64
}
var podSessionCountList []*podSessionCount

list := &corev1.PodList{}
_ = a.Client.List(ctx, list, client.InNamespace(old.Namespace), client.MatchingLabels(old.Spec.Selector.MatchLabels))

for _, node := range instance.Status.ReplicantNodesStatus.Nodes {
for _, pod := range list.Items {
host := strings.Split(node.Node[strings.Index(node.Node, "@")+1:], ":")[0]
if pod.Status.PodIP == host {
podSessionCountList = append(podSessionCountList, &podSessionCount{
pod: pod.DeepCopy(),
edition: node.Edition,
session: node.Session,
})
}
}
}

sort.Slice(podSessionCountList, func(i, j int) bool {
return podSessionCountList[i].session < podSessionCountList[j].session
})

if podSessionCountList[0].edition == "Enterprise" && podSessionCountList[0].session > 0 {
return nil
}

return podSessionCountList[0].pod
}

func generateReplicaSet(instance *appsv2alpha2.EMQX) *appsv1.ReplicaSet {
return &appsv1.ReplicaSet{
TypeMeta: metav1.TypeMeta{
Expand Down
Loading

0 comments on commit efd57aa

Please sign in to comment.