Skip to content

Commit 6237936

Browse files
authored
Add a safety check before changing coordinators (#2373)
* Add a safety check before changing coordinators
1 parent 32ebdd3 commit 6237936

File tree

11 files changed

+704
-43
lines changed

11 files changed

+704
-43
lines changed

controllers/change_coordinators.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/coordinator"
2727
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/locality"
28+
"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbstatus"
2829
"github.com/go-logr/logr"
2930

3031
corev1 "k8s.io/api/core/v1"
@@ -94,6 +95,20 @@ func (c changeCoordinators) reconcile(
9495
return nil
9596
}
9697

98+
// Perform safety checks before changing coordinators. The minimum uptime should reduce the coordinator changes
99+
// if a process is down for a short amount of time, e.g. after a cluster wide bounce.
100+
err = fdbstatus.CanSafelyChangeCoordinators(
101+
logger,
102+
cluster,
103+
status,
104+
r.MinimumUptimeForCoordinatorChangeWithMissingProcess,
105+
r.MinimumUptimeForCoordinatorChangeWithUndesiredProcess,
106+
)
107+
if err != nil {
108+
logger.Info("Deferring coordinator change due to safety check", "error", err.Error())
109+
return &requeue{curError: err, delayedRequeue: true}
110+
}
111+
97112
err = r.takeLock(logger, cluster, "changing coordinators")
98113
if err != nil {
99114
return &requeue{curError: err, delayedRequeue: true}

controllers/change_coordinators_test.go

Lines changed: 176 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package controllers
2323
import (
2424
"context"
2525
"math"
26+
"time"
2627

2728
fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/v2/api/v1beta2"
2829
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal"
@@ -49,10 +50,6 @@ var _ = Describe("Change coordinators", func() {
4950
},
5051
}
5152
Expect(setupClusterForTest(cluster)).NotTo(HaveOccurred())
52-
53-
var err error
54-
_, err = mock.NewMockAdminClientUncast(cluster, k8sClient)
55-
Expect(err).NotTo(HaveOccurred())
5653
})
5754

5855
Describe("reconcile", func() {
@@ -69,7 +66,7 @@ var _ = Describe("Change coordinators", func() {
6966
clusterReconciler,
7067
cluster,
7168
nil,
72-
globalControllerLogger,
69+
testLogger,
7370
)
7471
})
7572

@@ -163,5 +160,179 @@ var _ = Describe("Change coordinators", func() {
163160
},
164161
)
165162
})
163+
164+
When("safety checks are enabled", func() {
165+
BeforeEach(func() {
166+
clusterReconciler.MinimumUptimeForCoordinatorChangeWithUndesiredProcess = 5 * time.Minute
167+
clusterReconciler.MinimumUptimeForCoordinatorChangeWithMissingProcess = 10 * time.Minute
168+
clusterReconciler.EnableRecoveryState = true
169+
})
170+
171+
When("one coordinator is undesired", func() {
172+
BeforeEach(func() {
173+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
174+
Expect(err).NotTo(HaveOccurred())
175+
176+
status, err := adminClient.GetStatus()
177+
Expect(err).NotTo(HaveOccurred())
178+
179+
coordinators := map[string]fdbv1beta2.None{}
180+
for _, coordinator := range status.Client.Coordinators.Coordinators {
181+
coordinators[coordinator.Address.String()] = fdbv1beta2.None{}
182+
}
183+
184+
for _, process := range status.Cluster.Processes {
185+
if _, ok := coordinators[process.Address.String()]; !ok {
186+
continue
187+
}
188+
Expect(adminClient.ExcludeProcesses([]fdbv1beta2.ProcessAddress{
189+
{
190+
IPAddress: process.Address.IPAddress,
191+
},
192+
})).To(Succeed())
193+
break
194+
}
195+
})
196+
197+
When("the cluster is up for long enough", func() {
198+
It("should change the coordinators", func() {
199+
Expect(requeue).To(BeNil())
200+
Expect(
201+
cluster.Status.ConnectionString,
202+
).NotTo(Equal(originalConnectionString))
203+
})
204+
})
205+
206+
When("Too many active generations are present", func() {
207+
BeforeEach(func() {
208+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
209+
Expect(err).NotTo(HaveOccurred())
210+
adminClient.ActiveGenerations = ptr.To(11)
211+
})
212+
213+
AfterEach(func() {
214+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
215+
Expect(err).NotTo(HaveOccurred())
216+
adminClient.ActiveGenerations = nil
217+
})
218+
219+
It("should defer coordinator change and requeue with delay", func() {
220+
Expect(requeue).NotTo(BeNil())
221+
Expect(requeue.delayedRequeue).To(BeTrue())
222+
Expect(requeue.curError).To(HaveOccurred())
223+
Expect(
224+
requeue.curError.Error(),
225+
).To(ContainSubstring("cluster has 11 active generations, but only 10 active generations are allowed to safely change coordinators"))
226+
Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString))
227+
})
228+
})
229+
230+
When("the cluster is only up for 10 seconds", func() {
231+
BeforeEach(func() {
232+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
233+
Expect(err).NotTo(HaveOccurred())
234+
adminClient.SecondsSinceLastRecovered = ptr.To(10.0)
235+
})
236+
237+
AfterEach(func() {
238+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
239+
Expect(err).NotTo(HaveOccurred())
240+
adminClient.SecondsSinceLastRecovered = nil
241+
})
242+
243+
It("should defer coordinator change and requeue with delay", func() {
244+
Expect(requeue).NotTo(BeNil())
245+
Expect(requeue.delayedRequeue).To(BeTrue())
246+
Expect(requeue.curError).To(HaveOccurred())
247+
Expect(
248+
requeue.curError.Error(),
249+
).To(Equal("cannot: change coordinators: cluster is not up for long enough, clusters last recovery was 10.00 seconds ago, waiting until the last recovery was 300 seconds ago"))
250+
Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString))
251+
})
252+
})
253+
})
254+
255+
When("one coordinator is missing", func() {
256+
BeforeEach(func() {
257+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
258+
Expect(err).NotTo(HaveOccurred())
259+
260+
status, err := adminClient.GetStatus()
261+
Expect(err).NotTo(HaveOccurred())
262+
263+
coordinators := map[string]fdbv1beta2.None{}
264+
for _, coordinator := range status.Client.Coordinators.Coordinators {
265+
coordinators[coordinator.Address.String()] = fdbv1beta2.None{}
266+
}
267+
268+
for _, process := range status.Cluster.Processes {
269+
if _, ok := coordinators[process.Address.String()]; !ok {
270+
continue
271+
}
272+
adminClient.MockMissingProcessGroup(
273+
fdbv1beta2.ProcessGroupID(
274+
process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey],
275+
),
276+
true,
277+
)
278+
break
279+
}
280+
})
281+
282+
When("the cluster is up for long enough", func() {
283+
It("should change the coordinators", func() {
284+
Expect(requeue).To(BeNil())
285+
Expect(
286+
cluster.Status.ConnectionString,
287+
).NotTo(Equal(originalConnectionString))
288+
})
289+
})
290+
291+
When("Multiple active generations are present", func() {
292+
BeforeEach(func() {
293+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
294+
Expect(err).NotTo(HaveOccurred())
295+
adminClient.ActiveGenerations = ptr.To(11)
296+
})
297+
298+
AfterEach(func() {
299+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
300+
Expect(err).NotTo(HaveOccurred())
301+
adminClient.ActiveGenerations = nil
302+
})
303+
304+
It("should change the coordinators", func() {
305+
Expect(requeue).To(BeNil())
306+
Expect(
307+
cluster.Status.ConnectionString,
308+
).NotTo(Equal(originalConnectionString))
309+
})
310+
})
311+
312+
When("the cluster is only up for 10 seconds", func() {
313+
BeforeEach(func() {
314+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
315+
Expect(err).NotTo(HaveOccurred())
316+
adminClient.SecondsSinceLastRecovered = ptr.To(10.0)
317+
})
318+
319+
AfterEach(func() {
320+
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
321+
Expect(err).NotTo(HaveOccurred())
322+
adminClient.SecondsSinceLastRecovered = nil
323+
})
324+
325+
It("should defer coordinator change and requeue with delay", func() {
326+
Expect(requeue).NotTo(BeNil())
327+
Expect(requeue.delayedRequeue).To(BeTrue())
328+
Expect(requeue.curError).To(HaveOccurred())
329+
Expect(
330+
requeue.curError.Error(),
331+
).To(Equal("cannot: change coordinators: cluster has 1 missing coordinators, clusters last recovery was 10.00 seconds ago, waiting until the last recovery was 600 seconds ago"))
332+
Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString))
333+
})
334+
})
335+
})
336+
})
166337
})
167338
})

controllers/cluster_controller.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,16 @@ type FoundationDBClusterReconciler struct {
126126
// wait time will increase the chances that all updates are part of the list but will also delay the rollout of
127127
// the change.
128128
GlobalSynchronizationWaitDuration time.Duration
129+
// MinimumUptimeForCoordinatorChangeWithMissingProcess defines the minimum uptime of the cluster before coordinator
130+
// changes because of a missing coordinator are allowed.
131+
MinimumUptimeForCoordinatorChangeWithMissingProcess time.Duration
132+
// MinimumUptimeForCoordinatorChangeWithUndesiredProcess defines the minimum uptime of the cluster before coordinator
133+
// changes because of an undesired coordinator are allowed.
134+
MinimumUptimeForCoordinatorChangeWithUndesiredProcess time.Duration
135+
// MinimumUptimeForConfigurationChanges defines the minimum uptime for the cluster before configuration changes
136+
// are allowed.
137+
MinimumUptimeForConfigurationChanges time.Duration
138+
129139
// MinimumRecoveryTimeForInclusion defines the duration in seconds that a cluster must be up
130140
// before new inclusions are allowed. The operator issuing frequent inclusions in a short time window
131141
// could cause instability for the cluster as each inclusion will/can cause a recovery. Delaying the inclusion

controllers/suite_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,12 @@ func createTestClusterReconciler() *FoundationDBClusterReconciler {
209209
SimulateZones: true,
210210
SimulateTime: true,
211211
},
212-
PodLifecycleManager: &podmanager.StandardPodLifecycleManager{},
213-
PodClientProvider: mockpodclient.NewMockFdbPodClient,
214-
DatabaseClientProvider: mock.DatabaseClientProvider{},
215-
MaintenanceListStaleDuration: 4 * time.Hour,
216-
MaintenanceListWaitDuration: 5 * time.Minute,
217-
HighRunLoopBusyThreshold: 1.0,
212+
PodLifecycleManager: &podmanager.StandardPodLifecycleManager{},
213+
PodClientProvider: mockpodclient.NewMockFdbPodClient,
214+
DatabaseClientProvider: mock.DatabaseClientProvider{},
215+
MaintenanceListStaleDuration: 4 * time.Hour,
216+
MaintenanceListWaitDuration: 5 * time.Minute,
217+
HighRunLoopBusyThreshold: 1.0,
218+
MinimumUptimeForConfigurationChanges: 1 * time.Minute,
218219
}
219220
}

controllers/update_database_configuration.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,9 @@ func (u updateDatabaseConfiguration) reconcile(
9292
return &requeue{curError: err, delayedRequeue: true}
9393
}
9494

95-
err = fdbstatus.ConfigurationChangeAllowed(
95+
err = fdbstatus.ConfigurationChangeAllowedWithMinimumUptime(
9696
status,
97+
r.MinimumUptimeForConfigurationChanges,
9798
runningVersion.SupportsRecoveryState() && r.EnableRecoveryState,
9899
)
99100
if err != nil {

controllers/update_database_configuration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ var _ = Describe("update_database_configuration", func() {
278278
Expect(requeue).NotTo(BeNil())
279279
Expect(
280280
requeue.message,
281-
).To(Equal("Configuration change is not safe: clusters last recovery was 0.10 seconds ago, wait until the last recovery was 60 seconds ago, will retry"))
281+
).To(Equal("Configuration change is not safe: cannot: change configuration, clusters last recovery was 0.10 seconds ago, waiting until the last recovery was 60 seconds ago, will retry"))
282282
Expect(cluster.Status.Configured).To(BeTrue())
283283

284284
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)

e2e/fixtures/fdb_operator_client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,9 @@ spec:
505505
- --cluster-label-key-for-node-trigger=foundationdb.org/fdb-cluster-name
506506
- --enable-node-index
507507
- --replace-on-security-context-change
508+
- --minimum-uptime-for-coordinator-change-with-undesired-process=20s
509+
- --minimum-uptime-for-coordinator-change-with-missing-process=10s
510+
- --minimum-uptime-for-configuration-changes=5s
508511
`
509512
)
510513

pkg/fdbadminclient/mock/admin_client_mock.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@ type AdminClient struct {
7272
localityInfo map[fdbv1beta2.ProcessGroupID]map[string]string
7373
MaxZoneFailuresWithoutLosingData *int
7474
MaxZoneFailuresWithoutLosingAvailability *int
75+
ActiveGenerations *int
7576
MaintenanceZone fdbv1beta2.FaultDomain
7677
restoreURL string
7778
maintenanceZoneStartTimestamp time.Time
7879
MockAdditionTimeForGlobalCoordination time.Time
7980
uptimeSecondsForMaintenanceZone float64
81+
SecondsSinceLastRecovered *float64
8082
TeamTracker []fdbv1beta2.FoundationDBStatusTeamTracker
8183
Logs []fdbv1beta2.FoundationDBStatusLogInfo
8284
mockError error
@@ -616,8 +618,8 @@ func (client *AdminClient) GetStatus() (*fdbv1beta2.FoundationDBStatus, error) {
616618

617619
status.Cluster.RecoveryState = fdbv1beta2.RecoveryState{
618620
Name: "fully_recovered",
619-
SecondsSinceLastRecovered: 600.0,
620-
ActiveGenerations: 1,
621+
SecondsSinceLastRecovered: ptr.Deref(client.SecondsSinceLastRecovered, 600.0),
622+
ActiveGenerations: ptr.Deref(client.ActiveGenerations, 1),
621623
}
622624

623625
return status, nil

0 commit comments

Comments
 (0)