Skip to content

Commit

Permalink
refactor: refactor status machine
Browse files Browse the repository at this point in the history
  • Loading branch information
Rory-Z committed Jul 4, 2023
1 parent efd57aa commit 7b40bf8
Show file tree
Hide file tree
Showing 15 changed files with 674 additions and 514 deletions.
1 change: 0 additions & 1 deletion apis/apps/v2alpha2/emqx_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ type EMQXSpec struct {
//+kubebuilder:storageversion
//+kubebuilder:subresource:status
//+kubebuilder:subresource:scale:specpath=.spec.replicantTemplate.spec.replicas,statuspath=.status.replicantNodeReplicas
//+kubebuilder:printcolumn:name="Image",type="string",JSONPath=".status.currentImage"
//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.conditions[?(@.status==\"True\")].type"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

Expand Down
12 changes: 4 additions & 8 deletions apis/apps/v2alpha2/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ import (

// EMQXStatus defines the observed state of EMQX
type EMQXStatus struct {
// CurrentImage, indicates the image of the EMQX used to generate Pods in the
CurrentImage string `json:"currentImage,omitempty"`
// Represents the latest available observations of a EMQX Custom Resource current state.
Conditions []metav1.Condition `json:"conditions,omitempty"`

CoreNodesStatus EMQXNodesStatus `json:"CoreNodesStatus,omitempty"`
ReplicantNodesStatus *EMQXNodesStatus `json:"ReplicantNodesStatus,omitempty"`
CoreNodesStatus EMQXNodesStatus `json:"coreNodesStatus,omitempty"`
ReplicantNodesStatus *EMQXNodesStatus `json:"replicantNodesStatus,omitempty"`
}

type EMQXNodesStatus struct {
Expand Down Expand Up @@ -67,9 +65,10 @@ type EMQXNode struct {
const (
Initialized string = "Initialized"
CoreNodesProgressing string = "CoreNodesProgressing"
CodeNodesReady string = "CodeNodesReady"
CoreNodesReady string = "CoreNodesReady"
ReplicantNodesProgressing string = "ReplicantNodesProgressing"
ReplicantNodesReady string = "ReplicantNodesReady"
Available string = "Available"
Ready string = "Ready"
)

Expand Down Expand Up @@ -100,9 +99,6 @@ func (s *EMQXStatus) SetCondition(c metav1.Condition) {
c.LastTransitionTime = metav1.Now()
pos, _ := s.GetCondition(c.Type)
if pos >= 0 {
if s.Conditions[pos].Status == c.Status && !s.Conditions[pos].LastTransitionTime.IsZero() {
c.LastTransitionTime = s.Conditions[pos].LastTransitionTime
}
s.Conditions[pos] = c
} else {
s.Conditions = append(s.Conditions, c)
Expand Down
2 changes: 1 addition & 1 deletion apis/apps/v2alpha2/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestSetCondition(t *testing.T) {

assert.Equal(t, 1, len(status.Conditions))
assert.NotEmpty(t, c3.LastTransitionTime)
assert.Equal(t, c2.LastTransitionTime, c3.LastTransitionTime)
assert.NotEqual(t, c2.LastTransitionTime, c3.LastTransitionTime)
})
}

Expand Down
81 changes: 38 additions & 43 deletions config/crd/bases/apps.emqx.io_emqxes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6464,9 +6464,6 @@ spec:
statusReplicasPath: .status.replicantNodeReplicas
status: {}
- additionalPrinterColumns:
- jsonPath: .status.currentImage
name: Image
type: string
- jsonPath: .status.conditions[?(@.status=="True")].type
name: Status
type: string
Expand Down Expand Up @@ -12867,7 +12864,43 @@ spec:
type: object
status:
properties:
CoreNodesStatus:
conditions:
items:
properties:
lastTransitionTime:
format: date-time
type: string
message:
maxLength: 32768
type: string
observedGeneration:
format: int64
minimum: 0
type: integer
reason:
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
enum:
- "True"
- "False"
- Unknown
type: string
type:
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
required:
- lastTransitionTime
- message
- reason
- status
- type
type: object
type: array
coreNodesStatus:
properties:
collisionCount:
format: int32
Expand Down Expand Up @@ -12907,7 +12940,7 @@ spec:
format: int32
type: integer
type: object
ReplicantNodesStatus:
replicantNodesStatus:
properties:
collisionCount:
format: int32
Expand Down Expand Up @@ -12947,44 +12980,6 @@ spec:
format: int32
type: integer
type: object
conditions:
items:
properties:
lastTransitionTime:
format: date-time
type: string
message:
maxLength: 32768
type: string
observedGeneration:
format: int64
minimum: 0
type: integer
reason:
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
enum:
- "True"
- "False"
- Unknown
type: string
type:
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
required:
- lastTransitionTime
- message
- reason
- status
- type
type: object
type: array
currentImage:
type: string
type: object
type: object
served: true
Expand Down
43 changes: 32 additions & 11 deletions controllers/apps/v2alpha2/add_emqx_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

emperror "emperror.dev/errors"
"github.com/cisco-open/k8s-objectmatcher/patch"
appsv2alpha2 "github.com/emqx/emqx-operator/apis/apps/v2alpha2"
innerReq "github.com/emqx/emqx-operator/internal/requester"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -16,6 +17,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -24,10 +26,10 @@ type addCore struct {
}

func (a *addCore) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _ innerReq.RequesterInterface) subResult {
sts := a.getNewStatefulSet(ctx, instance)
if sts.UID == "" {
_ = ctrl.SetControllerReference(instance, sts, a.Scheme)
if err := a.Handler.Create(sts); err != nil {
preSts := a.getNewStatefulSet(ctx, instance)
if preSts.UID == "" {
_ = ctrl.SetControllerReference(instance, preSts, a.Scheme)
if err := a.Handler.Create(preSts); err != nil {
if k8sErrors.IsAlreadyExists(emperror.Cause(err)) {
if instance.Status.CoreNodesStatus.CollisionCount == nil {
instance.Status.CoreNodesStatus.CollisionCount = pointer.Int32(0)
Expand All @@ -38,15 +40,34 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _
}
return subResult{err: emperror.Wrap(err, "failed to create statefulSet")}
}
instance.Status.SetCondition(metav1.Condition{
Type: appsv2alpha2.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewStatefulSet",
Message: "Create new statefulSet",
})
instance.Status.CoreNodesStatus.CurrentRevision = preSts.Labels[appsv2alpha2.PodTemplateHashLabelKey]
_ = a.Client.Status().Update(ctx, instance)
} else {
if err := a.Handler.CreateOrUpdate(sts); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update statefulSet")}
}
}
storageSts := &appsv1.StatefulSet{}
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(preSts), storageSts)
patchResult, _ := a.Patcher.Calculate(storageSts, preSts,
patch.IgnoreStatusFields(),
patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
)
if !patchResult.IsEmpty() {
logger := log.FromContext(ctx)
logger.V(1).Info("got different statefulSet for EMQX core nodes, will update statefulSet", "patch", string(patchResult.Patch))

if instance.Status.CoreNodesStatus.CurrentRevision != sts.Labels[appsv2alpha2.PodTemplateHashLabelKey] {
instance.Status.CoreNodesStatus.CurrentRevision = sts.Labels[appsv2alpha2.PodTemplateHashLabelKey]
_ = a.Client.Status().Update(ctx, instance)
_ = a.Handler.Update(preSts)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2alpha2.CoreNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewStatefulSet",
Message: "Create new statefulSet",
})
_ = a.Client.Status().Update(ctx, instance)
}
}

if err := a.sync(ctx, instance); err != nil {
Expand Down
17 changes: 16 additions & 1 deletion controllers/apps/v2alpha2/add_emqx_core_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
LastTransitionTime: metav1.Time{Time: time.Now().AddDate(0, 0, -1)},
},
{
Type: appsv2alpha2.CodeNodesReady,
Type: appsv2alpha2.CoreNodesReady,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: time.Now().AddDate(0, 0, -1)},
},
Expand Down Expand Up @@ -98,6 +98,13 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
}).Should(ConsistOf(
WithTransform(func(s appsv1.StatefulSet) int32 { return *s.Spec.Replicas }, Equal(*instance.Spec.CoreTemplate.Spec.Replicas)),
))

Eventually(func() *appsv2alpha2.EMQX {
_ = k8sClient.Get(ctx, client.ObjectKeyFromObject(instance), instance)
return instance
}).Should(WithTransform(
func(emqx *appsv2alpha2.EMQX) string { return emqx.Status.GetLastTrueCondition().Type }, Equal(appsv2alpha2.CoreNodesProgressing),
))
})
})

Expand All @@ -120,6 +127,13 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
WithTransform(func(s appsv1.StatefulSet) string { return s.Spec.Template.Spec.Containers[0].Image }, Equal(emqx.Spec.Image)),
WithTransform(func(s appsv1.StatefulSet) string { return s.Spec.Template.Spec.Containers[0].Image }, Equal(instance.Spec.Image)),
))

Eventually(func() *appsv2alpha2.EMQX {
_ = k8sClient.Get(ctx, client.ObjectKeyFromObject(instance), instance)
return instance
}).Should(WithTransform(
func(emqx *appsv2alpha2.EMQX) string { return emqx.Status.GetLastTrueCondition().Type }, Equal(appsv2alpha2.CoreNodesProgressing),
))
})
})

Expand Down Expand Up @@ -155,6 +169,7 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
instance.Spec.UpdateStrategy.InitialDelaySeconds = int32(0)
instance.Spec.UpdateStrategy.EvacuationStrategy.WaitTakeover = int32(0)
})

It("should scale down", func() {
for *old.Spec.Replicas > 0 {
preReplicas := *old.Spec.Replicas
Expand Down
45 changes: 32 additions & 13 deletions controllers/apps/v2alpha2/add_emqx_repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

emperror "emperror.dev/errors"
"github.com/cisco-open/k8s-objectmatcher/patch"
appsv2alpha2 "github.com/emqx/emqx-operator/apis/apps/v2alpha2"
innerReq "github.com/emqx/emqx-operator/internal/requester"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -26,14 +27,14 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _
if instance.Spec.ReplicantTemplate == nil {
return subResult{}
}
if !instance.Status.IsConditionTrue(appsv2alpha2.CodeNodesReady) {
if !instance.Status.IsConditionTrue(appsv2alpha2.CoreNodesReady) {
return subResult{}
}

rs := a.getNewReplicaSet(ctx, instance)
if rs.UID == "" {
_ = ctrl.SetControllerReference(instance, rs, a.Scheme)
if err := a.Handler.Create(rs); err != nil {
preRs := a.getNewReplicaSet(ctx, instance)
if preRs.UID == "" {
_ = ctrl.SetControllerReference(instance, preRs, a.Scheme)
if err := a.Handler.Create(preRs); err != nil {
if k8sErrors.IsAlreadyExists(emperror.Cause(err)) {
if instance.Status.ReplicantNodesStatus.CollisionCount == nil {
instance.Status.ReplicantNodesStatus.CollisionCount = pointer.Int32(0)
Expand All @@ -44,16 +45,34 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _
}
return subResult{err: emperror.Wrap(err, "failed to create replicaSet")}
}

instance.Status.SetCondition(metav1.Condition{
Type: appsv2alpha2.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewReplicaSet",
Message: "Create new replicaSet",
})
instance.Status.ReplicantNodesStatus.CurrentRevision = preRs.Labels[appsv2alpha2.PodTemplateHashLabelKey]
_ = a.Client.Status().Update(ctx, instance)
} else {
if err := a.Handler.CreateOrUpdate(rs); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update replicaSet")}
}
}
storageRs := &appsv1.ReplicaSet{}
_ = a.Client.Get(ctx, client.ObjectKeyFromObject(preRs), storageRs)
patchResult, _ := a.Patcher.Calculate(storageRs, preRs,
patch.IgnoreStatusFields(),
patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
)
if !patchResult.IsEmpty() {
logger := log.FromContext(ctx)
logger.V(1).Info("got different statefulSet for EMQX core nodes, will update statefulSet", "patch", string(patchResult.Patch))

if instance.Status.ReplicantNodesStatus.CurrentRevision != rs.Labels[appsv2alpha2.PodTemplateHashLabelKey] {
instance.Status.ReplicantNodesStatus.CurrentRevision = rs.Labels[appsv2alpha2.PodTemplateHashLabelKey]
_ = a.Client.Status().Update(ctx, instance)
_ = a.Handler.Update(preRs)
instance.Status.SetCondition(metav1.Condition{
Type: appsv2alpha2.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Reason: "CreateNewReplicaSet",
Message: "Create new replicaSet",
})
_ = a.Client.Status().Update(ctx, instance)
}
}

if err := a.sync(ctx, instance); err != nil {
Expand Down
18 changes: 16 additions & 2 deletions controllers/apps/v2alpha2/add_emqx_repl_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
LastTransitionTime: metav1.Time{Time: time.Now().AddDate(0, 0, -1)},
},
{
Type: appsv2alpha2.CodeNodesReady,
Type: appsv2alpha2.CoreNodesReady,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: time.Now().AddDate(0, 0, -1)},
},
Expand Down Expand Up @@ -85,7 +85,7 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {

Context("core nodes is not ready", func() {
JustBeforeEach(func() {
instance.Status.RemoveCondition(appsv2alpha2.CodeNodesReady)
instance.Status.RemoveCondition(appsv2alpha2.CoreNodesReady)
})

It("should do nothing", func() {
Expand Down Expand Up @@ -174,6 +174,13 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
}).Should(ConsistOf(
WithTransform(func(rs appsv1.ReplicaSet) int32 { return *rs.Spec.Replicas }, Equal(*instance.Spec.ReplicantTemplate.Spec.Replicas)),
))

Eventually(func() *appsv2alpha2.EMQX {
_ = k8sClient.Get(ctx, client.ObjectKeyFromObject(instance), instance)
return instance
}).Should(WithTransform(
func(emqx *appsv2alpha2.EMQX) string { return emqx.Status.GetLastTrueCondition().Type }, Equal(appsv2alpha2.ReplicantNodesProgressing),
))
})
})

Expand All @@ -196,6 +203,13 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
WithTransform(func(rs appsv1.ReplicaSet) string { return rs.Spec.Template.Spec.Containers[0].Image }, Equal(emqx.Spec.Image)),
WithTransform(func(rs appsv1.ReplicaSet) string { return rs.Spec.Template.Spec.Containers[0].Image }, Equal(instance.Spec.Image)),
))

Eventually(func() *appsv2alpha2.EMQX {
_ = k8sClient.Get(ctx, client.ObjectKeyFromObject(instance), instance)
return instance
}).Should(WithTransform(
func(emqx *appsv2alpha2.EMQX) string { return emqx.Status.GetLastTrueCondition().Type }, Equal(appsv2alpha2.ReplicantNodesProgressing),
))
})
})

Expand Down
Loading

0 comments on commit 7b40bf8

Please sign in to comment.