Skip to content

Commit

Permalink
feat(v2alpha2): create muilt sts for update core node
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Jun 25, 2023
1 parent faea772 commit 78f7a31
Show file tree
Hide file tree
Showing 17 changed files with 639 additions and 237 deletions.
4 changes: 2 additions & 2 deletions apis/apps/v2alpha2/emqx_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type EMQXReplicantTemplateSpec struct {
// Replicas is the desired number of replicas of the given Template.
// These are replicas in the sense that they are instantiations of the
// same Template, but individual replicas also have a consistent identity.
// Defaults to 3.
//+kubebuilder:default:=3
// Defaults to 2.
//+kubebuilder:default:=2
Replicas *int32 `json:"replicas,omitempty"`
// Entrypoint array. Not executed within a shell.
// The container image's ENTRYPOINT is used if this is not provided.
Expand Down
54 changes: 54 additions & 0 deletions apis/apps/v2alpha2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,63 @@ import (
// "github.com/gurkankaymak/hocon"
hocon "github.com/rory-z/go-hocon"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// Clones the given selector and returns a new selector with the given key and value added.
// Returns the given selector, if labelKey is empty.
func CloneSelectorAndAddLabel(selector *metav1.LabelSelector, labelKey, labelValue string) *metav1.LabelSelector {
if labelKey == "" {
// Don't need to add a label.
return selector
}

// Clone.
newSelector := new(metav1.LabelSelector)

// TODO(madhusudancs): Check if you can use deepCopy_extensions_LabelSelector here.
newSelector.MatchLabels = make(map[string]string)
if selector.MatchLabels != nil {
for key, val := range selector.MatchLabels {
newSelector.MatchLabels[key] = val
}
}
newSelector.MatchLabels[labelKey] = labelValue

if selector.MatchExpressions != nil {
newMExps := make([]metav1.LabelSelectorRequirement, len(selector.MatchExpressions))
for i, me := range selector.MatchExpressions {
newMExps[i].Key = me.Key
newMExps[i].Operator = me.Operator
if me.Values != nil {
newMExps[i].Values = make([]string, len(me.Values))
copy(newMExps[i].Values, me.Values)
} else {
newMExps[i].Values = nil
}
}
newSelector.MatchExpressions = newMExps
} else {
newSelector.MatchExpressions = nil
}

return newSelector
}

// AddLabelToSelector returns a selector with the given key and value added to the given selector's MatchLabels.
func AddLabelToSelector(selector *metav1.LabelSelector, labelKey, labelValue string) *metav1.LabelSelector {
if labelKey == "" {
// Don't need to add a label.
return selector
}
if selector.MatchLabels == nil {
selector.MatchLabels = make(map[string]string)
}
selector.MatchLabels[labelKey] = labelValue
return selector
}

// Clones the given map and returns a new map with the given key and value added.
// Returns the given map, if labelKey is empty.
func CloneAndAddLabel(labels map[string]string, labelKey, labelValue string) map[string]string {
Expand Down
4 changes: 2 additions & 2 deletions config/crd/bases/apps.emqx.io_emqxes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9317,7 +9317,7 @@ spec:
type: integer
type: object
replicas:
default: 3
default: 2
format: int32
type: integer
resources:
Expand Down Expand Up @@ -12720,7 +12720,7 @@ spec:
type: integer
type: object
replicas:
default: 3
default: 2
format: int32
type: integer
resources:
Expand Down
91 changes: 90 additions & 1 deletion controllers/apps/v2alpha2/add_emqx_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
innerReq "github.com/emqx/emqx-operator/internal/requester"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -19,13 +22,99 @@ type addCore struct {
}

func (a *addCore) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _ innerReq.RequesterInterface) subResult {
sts := generateStatefulSet(instance)
sts, collisionCount := a.getNewStatefulSet(ctx, instance)
if collisionCount != instance.Status.CoreNodesStatus.CollisionCount {
instance.Status.CoreNodesStatus.CollisionCount = collisionCount
_ = a.Client.Status().Update(ctx, instance)
}

if err := a.CreateOrUpdateList(instance, a.Scheme, []client.Object{sts}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to create or update statefulSet")}
}

if err := a.syncStatefulSet(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to sync replicaSet")}
}

return subResult{}
}

func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2alpha2.EMQX) (*appsv1.StatefulSet, *int32) {
preSts := generateStatefulSet(instance)

list := &appsv1.StatefulSetList{}
_ = a.Client.List(ctx, list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Spec.CoreTemplate.Labels),
)

for _, sts := range list.Items {
patchResult, _ := a.Patcher.Calculate(
sts.DeepCopy(),
preSts.DeepCopy(),
justCheckPodTemplateSpec(),
)
if patchResult.IsEmpty() {
preSts.ObjectMeta = *sts.ObjectMeta.DeepCopy()
preSts.Spec.Template.ObjectMeta = *sts.Spec.Template.ObjectMeta.DeepCopy()
preSts.Spec.Selector = sts.Spec.Selector.DeepCopy()
return preSts, instance.Status.CoreNodesStatus.CollisionCount
}
}

var collisionCount *int32
var staName, podTemplateSpecHash string
podTemplate := preSts.Spec.Template.DeepCopy()

collisionCount = instance.Status.CoreNodesStatus.CollisionCount
if collisionCount == nil {
collisionCount = new(int32)
}

// Do-while loop
for {
podTemplateSpecHash = computeHash(podTemplate.DeepCopy(), collisionCount)

staName = instance.Spec.CoreTemplate.Name + "-" + podTemplateSpecHash
err := a.Client.Get(context.TODO(), types.NamespacedName{
Namespace: instance.Namespace,
Name: staName,
}, &appsv1.StatefulSet{})
if k8sErrors.IsNotFound(err) {
break
}
*collisionCount++
}

preSts.Name = staName
preSts.Labels = appsv2alpha2.CloneAndAddLabel(preSts.Labels, appsv1.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
preSts.Spec.Template.Labels = appsv2alpha2.CloneAndAddLabel(preSts.Spec.Template.Labels, appsv1.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
preSts.Spec.Selector = appsv2alpha2.CloneSelectorAndAddLabel(preSts.Spec.Selector, appsv1.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
return preSts, collisionCount
}

func (a *addCore) syncStatefulSet(ctx context.Context, instance *appsv2alpha2.EMQX) error {
stsList := getStateFulSetList(ctx, a.Client,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Spec.CoreTemplate.Labels),
)
if len(stsList) <= 1 {
return nil
}

old := stsList[0].DeepCopy()
eList := getEventList(ctx, a.Clientset, old)

if canBeScaledDown(instance, appsv2alpha2.CodeNodesReady, eList) {
old.Spec.Replicas = pointer.Int32Ptr(old.Status.Replicas - 1)
if err := a.Client.Update(ctx, old); err != nil {
return emperror.Wrap(err, "failed to scale down old replicaSet")
}
return nil
}
return nil
}

func generateStatefulSet(instance *appsv2alpha2.EMQX) *appsv1.StatefulSet {
podTemplate := corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Expand Down
153 changes: 153 additions & 0 deletions controllers/apps/v2alpha2/add_emqx_core_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package v2alpha2

import (
"context"
"errors"

appsv2alpha2 "github.com/emqx/emqx-operator/apis/apps/v2alpha2"
innerReq "github.com/emqx/emqx-operator/internal/requester"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
var a *addCore
var req *innerReq.Requester = &innerReq.Requester{}
var ns *corev1.Namespace = &corev1.Namespace{}

var instance *appsv2alpha2.EMQX = new(appsv2alpha2.EMQX)

BeforeEach(func() {
a = &addCore{emqxReconciler}

ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "controller-v2alpha2-add-emqx-core-test",
Labels: map[string]string{
"test": "e2e",
},
},
}

instance = emqx.DeepCopy()
instance.Namespace = ns.Name
})

It("create namespace", func() {
Expect(k8sClient.Create(context.TODO(), ns)).Should(Succeed())
})

It("should create statefulSet", func() {
Eventually(a.reconcile(ctx, instance, req)).Should(Equal(subResult{}))
Eventually(func() []appsv1.StatefulSet {
list := &appsv1.StatefulSetList{}
_ = k8sClient.List(ctx, list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Spec.CoreTemplate.Labels),
)
return list.Items
}).Should(And(
HaveLen(1),
ContainElements(
WithTransform(func(s appsv1.StatefulSet) string { return s.Spec.Template.Spec.Containers[0].Image }, Equal(instance.Spec.Image)),
),
))
})

Context("change replicas count", func() {
JustBeforeEach(func() {
instance.Spec.CoreTemplate.Spec.Replicas = pointer.Int32(4)
})

It("should update statefulSet", func() {
Eventually(a.reconcile(ctx, instance, req)).Should(Equal(subResult{}))
Eventually(func() []appsv1.StatefulSet {
list := &appsv1.StatefulSetList{}
_ = k8sClient.List(ctx, list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Spec.CoreTemplate.Labels),
)
return list.Items
}).Should(ConsistOf(
WithTransform(func(s appsv1.StatefulSet) int32 { return *s.Spec.Replicas }, Equal(*instance.Spec.CoreTemplate.Spec.Replicas)),
))
})
})

Context("change image", func() {
JustBeforeEach(func() {
instance.Spec.Image = "emqx/emqx"
instance.Spec.UpdateStrategy.InitialDelaySeconds = int32(999999999)
})

It("should create new statefulSet", func() {
Eventually(a.reconcile(ctx, instance, req)).Should(Equal(subResult{}))
Eventually(func() []appsv1.StatefulSet {
list := &appsv1.StatefulSetList{}
_ = k8sClient.List(ctx, list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Spec.CoreTemplate.Labels),
)
return list.Items
}).WithTimeout(timeout).WithPolling(interval).Should(ConsistOf(
WithTransform(func(s appsv1.StatefulSet) string { return s.Spec.Template.Spec.Containers[0].Image }, Equal(emqx.Spec.Image)),
WithTransform(func(s appsv1.StatefulSet) string { return s.Spec.Template.Spec.Containers[0].Image }, Equal(instance.Spec.Image)),
))
})
})

Context("can be scale down", func() {
var old *appsv1.StatefulSet = new(appsv1.StatefulSet)
var realReplicas int32

JustBeforeEach(func() {
Eventually(func() error {
list := getStateFulSetList(ctx, a.Client,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Spec.CoreTemplate.Labels),
)
if len(list) == 0 {
return errors.New("not found")
}
old = list[0].DeepCopy()
return nil
}).Should(Succeed())

realReplicas = *old.Spec.Replicas
instance.Spec.UpdateStrategy.InitialDelaySeconds = int32(0)
instance.Spec.UpdateStrategy.EvacuationStrategy.WaitTakeover = int32(0)
})
It("should scale down", func() {
for realReplicas > 1 {
//mock statefulSet status
old.Status.Replicas = realReplicas
old.Status.ReadyReplicas = realReplicas
Expect(k8sClient.Status().Update(ctx, old)).Should(Succeed())
Eventually(func() *appsv1.StatefulSet {
_ = k8sClient.Get(ctx, client.ObjectKeyFromObject(old), old)
return old
}).WithTimeout(timeout).WithPolling(interval).Should(And(
WithTransform(func(s *appsv1.StatefulSet) int32 { return s.Status.Replicas }, Equal(realReplicas)),
WithTransform(func(s *appsv1.StatefulSet) int32 { return s.Status.ReadyReplicas }, Equal(realReplicas)),
))

// retry it because update the statefulSet maybe will conflict
Eventually(a.reconcile(ctx, instance, req)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
_ = k8sClient.Get(ctx, client.ObjectKeyFromObject(old), old)
Expect(*old.Spec.Replicas).Should(Equal(realReplicas - 1))

realReplicas--
}
})
})

It("delete namespace", func() {
Expect(k8sClient.Delete(ctx, ns)).Should(Succeed())
})
})
Loading

0 comments on commit 78f7a31

Please sign in to comment.