Skip to content

Commit 74d4113

Browse files
committed
fix: don't release the burst replica until the unreadyReplicas less than maxSurge
Signed-off-by: congcongke <[email protected]>
1 parent 66bcbc7 commit 74d4113

File tree

5 files changed

+234
-28
lines changed

5 files changed

+234
-28
lines changed

pkg/controllers/leaderworkerset_controller.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,11 +271,12 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context,
271271

272272
// wantReplicas calculates the final replicas if needed.
273273
wantReplicas := func(unreadyReplicas int32) int32 {
274-
if unreadyReplicas <= int32(maxSurge) {
275-
// When we have n unready replicas and n bursted replicas, we should
276-
// start to release the burst replica gradually for the accommodation of
277-
// the unready ones.
278-
finalReplicas := lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas)-1)
274+
if unreadyReplicas < int32(maxSurge) {
275+
// When the unready replicas less than maxSurge, we should start to release
276+
// the burst replica gradually for the accommodation of the unready ones.
277+
// Actually we should keep the burst replicas when the unready replicas is
278+
// equal to maxSurge, because the rolling update is not completed yet.
279+
finalReplicas := lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas))
279280
r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsProgressing, fmt.Sprintf("deleting surge replica %s-%d", lws.Name, finalReplicas))
280281
return finalReplicas
281282
}

pkg/controllers/leaderworkerset_controller_test.go

Lines changed: 165 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ import (
2424
appsv1 "k8s.io/api/apps/v1"
2525
corev1 "k8s.io/api/core/v1"
2626
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/util/intstr"
2829
appsapplyv1 "k8s.io/client-go/applyconfigurations/apps/v1"
2930
coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1"
3031
metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
32+
"k8s.io/client-go/tools/record"
3133
"k8s.io/utils/ptr"
32-
33-
leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
34-
34+
"sigs.k8s.io/controller-runtime/pkg/client"
3535
"sigs.k8s.io/controller-runtime/pkg/client/fake"
36+
leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1"
37+
"sigs.k8s.io/lws/client-go/clientset/versioned/scheme"
3638
revisionutils "sigs.k8s.io/lws/pkg/utils/revision"
3739
"sigs.k8s.io/lws/test/wrappers"
3840
)
@@ -536,3 +538,163 @@ func TestSetCondition(t *testing.T) {
536538
})
537539
}
538540
}
541+
542+
func TestLeaderWorkerSetReconciler_rollingUpdateParameters(t *testing.T) {
543+
type fields struct {
544+
Client client.Client
545+
Scheme *runtime.Scheme
546+
Record record.EventRecorder
547+
}
548+
type args struct {
549+
ctx context.Context
550+
lws *leaderworkerset.LeaderWorkerSet
551+
sts *appsv1.StatefulSet
552+
revisionKey string
553+
leaderWorkerSetUpdated bool
554+
}
555+
tests := []struct {
556+
name string
557+
fields fields
558+
args args
559+
wantPartitions int32
560+
wantReplicas int32
561+
wantErr bool
562+
}{
563+
{
564+
name: "lws is update without replicas, increase replica",
565+
fields: fields{
566+
Client: fake.NewClientBuilder().Build(),
567+
Scheme: scheme.Scheme,
568+
Record: record.NewFakeRecorder(100),
569+
},
570+
args: func() args {
571+
return args{
572+
ctx: context.Background(),
573+
lws: wrappers.BuildLeaderWorkerSet("default").Replica(1).MaxSurge(1).Size(1).MaxUnavailable(0).Obj(),
574+
sts: wrappers.MakeLeaderStatefulSetWithLabels("test-sample", "default", 1),
575+
revisionKey: "new",
576+
leaderWorkerSetUpdated: true,
577+
}
578+
}(),
579+
wantPartitions: 1,
580+
wantReplicas: 2,
581+
wantErr: false,
582+
},
583+
{
584+
name: "lws is updated without replicas, new pod running, decrease the partition",
585+
fields: func() fields {
586+
pod1 := wrappers.MakePodWithLabelsAndStatus("test-sample", "0", "0", "default", 1, corev1.PodRunning)
587+
pod1.Labels[leaderworkerset.RevisionKey] = "old"
588+
pod2 := wrappers.MakePodWithLabelsAndStatus("test-sample", "1", "0", "default", 1, corev1.PodRunning)
589+
pod2.Labels[leaderworkerset.RevisionKey] = "new"
590+
return fields{
591+
Client: fake.NewClientBuilder().WithObjects(
592+
pod1,
593+
pod2,
594+
wrappers.MakeLeaderStatefulSetWithLabels("test-sample", "default", 2),
595+
).Build(),
596+
Scheme: scheme.Scheme,
597+
Record: record.NewFakeRecorder(100),
598+
}
599+
}(),
600+
args: func() args {
601+
leaderSts := wrappers.MakeLeaderStatefulSetWithLabels("test-sample", "default", 2)
602+
leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition = ptr.To(int32(1))
603+
leaderSts.Annotations[leaderworkerset.ReplicasAnnotationKey] = "1"
604+
return args{
605+
ctx: context.Background(),
606+
lws: wrappers.BuildLeaderWorkerSet("default").Replica(1).MaxSurge(1).Size(1).MaxUnavailable(0).Obj(),
607+
sts: leaderSts,
608+
revisionKey: "new",
609+
leaderWorkerSetUpdated: false,
610+
}
611+
}(),
612+
wantPartitions: 0,
613+
wantReplicas: 2,
614+
wantErr: false,
615+
},
616+
{
617+
name: "lws is updated without replicas, old pod updated, decrease the replica",
618+
fields: func() fields {
619+
pod1 := wrappers.MakePodWithLabelsAndStatus("test-sample", "0", "0", "default", 1, corev1.PodRunning)
620+
pod1.Labels[leaderworkerset.RevisionKey] = "new"
621+
pod2 := wrappers.MakePodWithLabelsAndStatus("test-sample", "1", "0", "default", 1, corev1.PodRunning)
622+
pod2.Labels[leaderworkerset.RevisionKey] = "new"
623+
return fields{
624+
Client: fake.NewClientBuilder().WithObjects(
625+
pod1,
626+
pod2,
627+
wrappers.MakeLeaderStatefulSetWithLabels("test-sample", "default", 2),
628+
).Build(),
629+
Scheme: scheme.Scheme,
630+
Record: record.NewFakeRecorder(100),
631+
}
632+
}(),
633+
args: func() args {
634+
leaderSts := wrappers.MakeLeaderStatefulSetWithLabels("test-sample", "default", 2)
635+
leaderSts.Annotations[leaderworkerset.ReplicasAnnotationKey] = "1"
636+
return args{
637+
ctx: context.Background(),
638+
lws: wrappers.BuildLeaderWorkerSet("default").Replica(1).MaxSurge(1).Size(1).MaxUnavailable(0).Obj(),
639+
sts: leaderSts,
640+
revisionKey: "new",
641+
leaderWorkerSetUpdated: false,
642+
}
643+
}(),
644+
wantPartitions: 0,
645+
wantReplicas: 1,
646+
wantErr: false,
647+
},
648+
649+
{
650+
name: "lws is updated with replicas from 1 to 2, increase replicas",
651+
fields: func() fields {
652+
pod1 := wrappers.MakePodWithLabelsAndStatus("test-sample", "0", "0", "default", 1, corev1.PodRunning)
653+
pod1.Labels[leaderworkerset.RevisionKey] = "not-updated"
654+
return fields{
655+
Client: fake.NewClientBuilder().WithObjects(
656+
pod1,
657+
wrappers.MakeLeaderStatefulSetWithLabels("test-sample", "default", 1),
658+
).Build(),
659+
Scheme: scheme.Scheme,
660+
Record: record.NewFakeRecorder(100),
661+
}
662+
}(),
663+
args: func() args {
664+
leaderSts := wrappers.MakeLeaderStatefulSetWithLabels("test-sample", "default", 1)
665+
leaderSts.Annotations[leaderworkerset.ReplicasAnnotationKey] = "1"
666+
return args{
667+
ctx: context.Background(),
668+
lws: wrappers.BuildLeaderWorkerSet("default").Replica(2).MaxSurge(1).Size(1).MaxUnavailable(0).Obj(),
669+
sts: leaderSts,
670+
revisionKey: "not-updated",
671+
leaderWorkerSetUpdated: false,
672+
}
673+
}(),
674+
wantPartitions: 0,
675+
wantReplicas: 2,
676+
wantErr: false,
677+
},
678+
}
679+
680+
for _, tt := range tests {
681+
t.Run(tt.name, func(t *testing.T) {
682+
r := &LeaderWorkerSetReconciler{
683+
Client: tt.fields.Client,
684+
Scheme: tt.fields.Scheme,
685+
Record: tt.fields.Record,
686+
}
687+
partitions, replicas, err := r.rollingUpdateParameters(tt.args.ctx, tt.args.lws, tt.args.sts, tt.args.revisionKey, tt.args.leaderWorkerSetUpdated)
688+
if (err != nil) != tt.wantErr {
689+
t.Errorf("LeaderWorkerSetReconciler.rollingUpdateParameters() error = %v, wantErr %v", err, tt.wantErr)
690+
return
691+
}
692+
if partitions != tt.wantPartitions {
693+
t.Errorf("LeaderWorkerSetReconciler.rollingUpdateParameters() partitions = %v, want %v", partitions, tt.wantPartitions)
694+
}
695+
if replicas != tt.wantReplicas {
696+
t.Errorf("LeaderWorkerSetReconciler.rollingUpdateParameters() replicas = %v, want %v", replicas, tt.wantReplicas)
697+
}
698+
})
699+
}
700+
}

test/e2e/e2e_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() {
140140
testing.UpdateWorkerTemplate(ctx, k8sClient, lws)
141141

142142
// Happen during rolling update.
143-
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 7)
143+
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 8)
144144

145145
// Rolling update completes.
146146
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
@@ -186,7 +186,7 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() {
186186
testing.UpdateWorkerTemplate(ctx, k8sClient, lws)
187187

188188
// Happen during rolling update.
189-
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 7)
189+
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 8)
190190

191191
// Rolling update completes.
192192
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
@@ -245,8 +245,8 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() {
245245

246246
testing.UpdateWorkerTemplate(ctx, k8sClient, lws)
247247

248-
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 7)
249-
testing.ExpectValidServices(ctx, k8sClient, lws, 7)
248+
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 8)
249+
testing.ExpectValidServices(ctx, k8sClient, lws, 8)
250250
// Rolling update completes.
251251
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
252252
testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true)

test/integration/controllers/leaderworkerset_test.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,22 +1041,22 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
10411041
// Rolling update index-1 replica.
10421042
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
10431043
testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-1", lws)
1044-
// Reclaim the replica.
1045-
testing.DeleteLeaderPod(ctx, k8sClient, lws, 4, 5)
10461044
},
10471045
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
1048-
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
1046+
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 5)
10491047
testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready")
10501048
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
10511049
testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress")
10521050
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0)
1053-
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 4, 3)
1051+
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 5, 4)
10541052
},
10551053
},
10561054
{
10571055
// Rolling update index-0 replica.
10581056
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
10591057
testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-0", lws)
1058+
// Reclaim the replica.
1059+
testing.DeleteLeaderPod(ctx, k8sClient, lws, 4, 5)
10601060
},
10611061
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
10621062
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
@@ -1238,10 +1238,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
12381238
var leaderSts appsv1.StatefulSet
12391239
testing.GetLeaderStatefulset(ctx, lws, k8sClient, &leaderSts)
12401240
// Create leader pod for maxSurge.
1241-
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 2, 3)).To(gomega.Succeed())
1241+
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 2, 4)).To(gomega.Succeed())
12421242
},
12431243
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
1244-
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 3)
1244+
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
12451245
testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready")
12461246
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
12471247
testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress")
@@ -1251,7 +1251,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
12511251
{
12521252
// Set all groups to ready.
12531253
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
1254-
testing.SetPodGroupsToReady(ctx, k8sClient, lws, 3)
1254+
testing.SetPodGroupsToReady(ctx, k8sClient, lws, 4)
12551255
},
12561256
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
12571257
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")
@@ -1373,52 +1373,55 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
13731373
return k8sClient.Update(ctx, &leaderworkerset)
13741374
}, testing.Timeout, testing.Interval).Should(gomega.Succeed())
13751375
testing.DeleteLeaderPod(ctx, k8sClient, lws, 4, 8)
1376-
// Reclaim the last replica.
1377-
testing.DeleteLeaderPod(ctx, k8sClient, lws, 3, 4)
13781376
},
13791377
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
1380-
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 3)
1378+
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
13811379
testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready")
13821380
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
13831381
testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress")
13841382
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 1)
1385-
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 0)
1383+
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 4, 0)
13861384
},
13871385
},
13881386
{
13891387
// Rolling update index-2 replica.
13901388
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
13911389
testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-2", lws)
1390+
testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-3", lws)
1391+
// Reclaim the last replica.
1392+
// testing.DeleteLeaderPod(ctx, k8sClient, lws, 3, 4)
13921393
},
13931394
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
1394-
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 3)
1395+
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
13951396
testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready")
13961397
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
13971398
testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress")
13981399
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0)
1399-
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 1)
1400+
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 4, 2)
14001401
},
14011402
},
14021403
{
14031404
// Rolling update index-1 replica.
14041405
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
14051406
testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-1", lws)
14061407
// Reclaim the last replica.
1407-
testing.DeleteLeaderPod(ctx, k8sClient, lws, 2, 3)
1408+
testing.DeleteLeaderPod(ctx, k8sClient, lws, 3, 4)
14081409
},
14091410
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
1410-
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2)
1411+
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 3)
14111412
testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready")
14121413
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
14131414
testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress")
14141415
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0)
1415-
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 2, 1)
1416+
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 2)
14161417
},
14171418
},
14181419
{
14191420
// Rolling update index-0 replica.
14201421
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
14211422
testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-0", lws)
1423+
// Reclaim the last replica.
1424+
testing.DeleteLeaderPod(ctx, k8sClient, lws, 2, 3)
14221425
},
14231426
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
14241427
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2)

test/wrappers/wrappers.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"fmt"
1919
"strconv"
2020

21+
appsv1 "k8s.io/api/apps/v1"
2122
corev1 "k8s.io/api/core/v1"
2223
"k8s.io/apimachinery/pkg/api/resource"
2324
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -197,6 +198,45 @@ func MakePodWithLabels(setName, groupIndex, workerIndex, namespace string, size
197198
}
198199
}
199200

201+
func MakePodWithLabelsAndStatus(setName, groupIndex, workerIndex, namespace string, size int, status corev1.PodPhase) *corev1.Pod {
202+
pod := MakePodWithLabels(setName, groupIndex, workerIndex, namespace, size)
203+
pod.Status.Phase = status
204+
205+
if status == corev1.PodRunning {
206+
pod.Status.Conditions = []corev1.PodCondition{
207+
{
208+
Type: corev1.PodReady,
209+
Status: corev1.ConditionTrue,
210+
},
211+
}
212+
}
213+
return pod
214+
}
215+
216+
func MakeLeaderStatefulSetWithLabels(setName, namespace string, replica int) *appsv1.StatefulSet {
217+
return &appsv1.StatefulSet{
218+
ObjectMeta: metav1.ObjectMeta{
219+
Name: setName,
220+
Namespace: namespace,
221+
Labels: map[string]string{
222+
leaderworkerset.SetNameLabelKey: setName,
223+
},
224+
Annotations: map[string]string{
225+
leaderworkerset.ReplicasAnnotationKey: strconv.Itoa(replica),
226+
},
227+
},
228+
Spec: appsv1.StatefulSetSpec{
229+
Replicas: ptr.To(int32(replica)),
230+
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
231+
Type: appsv1.RollingUpdateStatefulSetStrategyType,
232+
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
233+
Partition: ptr.To[int32](0),
234+
},
235+
},
236+
},
237+
}
238+
}
239+
200240
func MakeWorkerPodSpec() corev1.PodSpec {
201241
return corev1.PodSpec{
202242
Containers: []corev1.Container{

0 commit comments

Comments
 (0)