Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement etcd member management in pre-terminate hook #435

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions controlplane/api/v1beta1/rke2controlplane_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ const (
// RollingUpdateStrategyType replaces the old control planes by new one using rolling update
// i.e. gradually scale up or down the old control planes and scale up or down the new one.
RollingUpdateStrategyType RolloutStrategyType = "RollingUpdate"

// PreTerminateHookCleanupAnnotation is the annotation RKE2 sets on Machines to ensure it can later remove the
// etcd member right before Machine termination (i.e. before InfraMachine deletion).
// Note: Starting with Kubernetes v1.31 this hook will wait for all other pre-terminate hooks to finish to
// ensure it runs last (thus ensuring that kubelet is still working while other pre-terminate hooks run).
PreTerminateHookCleanupAnnotation = clusterv1.PreTerminateDeleteHookAnnotationPrefix + "/rke2-cleanup"
)

func init() { //nolint:gochecknoinits
Expand Down
138 changes: 137 additions & 1 deletion controlplane/internal/controllers/rke2controlplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"strings"
"time"

"github.com/blang/semver/v4"
Expand All @@ -32,6 +33,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -47,6 +49,7 @@ import (
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/version"

controlplanev1 "github.com/rancher/cluster-api-provider-rke2/controlplane/api/v1beta1"
"github.com/rancher/cluster-api-provider-rke2/pkg/kubeconfig"
Expand Down Expand Up @@ -514,6 +517,10 @@ func (r *RKE2ControlPlaneReconciler) reconcileNormal(
return ctrl.Result{}, err
}

if result, err := r.reconcilePreTerminateHook(ctx, controlPlane); err != nil || !result.IsZero() {
return result, err
}

// Control plane machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
needRollout := controlPlane.MachinesNeedingRollout()

Expand Down Expand Up @@ -698,14 +705,31 @@ func (r *RKE2ControlPlaneReconciler) reconcileDelete(ctx context.Context,
}

// Delete control plane machines in parallel
machinesToDelete := ownedMachines.Filter(collections.Not(collections.HasDeletionTimestamp))
machinesToDelete := ownedMachines

var errs []error

for i := range machinesToDelete {
m := machinesToDelete[i]
logger := logger.WithValues("machine", m)

// During RKE2CP deletion we don't care about forwarding etcd leadership or removing etcd members.
// So we are removing the pre-terminate hook.
// This is important because when deleting RKE2CP we will delete all members of etcd and it's not possible
// to forward etcd leadership without any member left after we went through the Machine deletion.
// Also in this case the reconcileDelete code of the Machine controller won't execute Node drain
// and wait for volume detach.
if err := r.removePreTerminateHookAnnotationFromMachine(ctx, m); err != nil {
errs = append(errs, err)

continue
}

if !m.DeletionTimestamp.IsZero() {
// Nothing to do, Machine already has deletionTimestamp set.
continue
}

if err := r.Client.Delete(ctx, machinesToDelete[i]); err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to cleanup owned machine")
errs = append(errs, err)
Expand All @@ -720,6 +744,8 @@ func (r *RKE2ControlPlaneReconciler) reconcileDelete(ctx context.Context,
return ctrl.Result{}, err
}

logger.Info("Waiting for control plane Machines to not exist anymore")

conditions.MarkFalse(rcp, controlplanev1.ResizedCondition, clusterv1.DeletingReason, clusterv1.ConditionSeverityInfo, "")

return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
Expand Down Expand Up @@ -909,6 +935,116 @@ func (r *RKE2ControlPlaneReconciler) ClusterToRKE2ControlPlane(ctx context.Conte
}
}

func (r *RKE2ControlPlaneReconciler) reconcilePreTerminateHook(ctx context.Context, controlPlane *rke2.ControlPlane) (ctrl.Result, error) {
// Ensure that every active machine has the drain hook set
patchHookAnnotation := false

for _, machine := range controlPlane.Machines.Filter(collections.ActiveMachines) {
if _, exists := machine.Annotations[controlplanev1.PreTerminateHookCleanupAnnotation]; !exists {
machine.Annotations[controlplanev1.PreTerminateHookCleanupAnnotation] = ""
patchHookAnnotation = true
}
}

if patchHookAnnotation {
// Patch machine annoations
if err := controlPlane.PatchMachines(ctx); err != nil {
return ctrl.Result{}, err
}
}

if !controlPlane.HasDeletingMachine() {
return ctrl.Result{}, nil
}

log := ctrl.LoggerFrom(ctx)

// Return early, if there is already a deleting Machine without the pre-terminate hook.
// We are going to wait until this Machine goes away before running the pre-terminate hook on other Machines.
for _, deletingMachine := range controlPlane.DeletingMachines() {
if _, exists := deletingMachine.Annotations[controlplanev1.PreTerminateHookCleanupAnnotation]; !exists {
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}
}

// Pick the Machine with the oldest deletionTimestamp to keep this function deterministic / reentrant
// so we only remove the pre-terminate hook from one Machine at a time.
deletingMachines := controlPlane.DeletingMachines()
deletingMachine := controlPlane.SortedByDeletionTimestamp(deletingMachines)[0]

log = log.WithValues("Machine", klog.KObj(deletingMachine))
ctx = ctrl.LoggerInto(ctx, log)

parsedVersion, err := semver.ParseTolerant(controlPlane.RCP.Spec.Version)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to parse Kubernetes version %q", controlPlane.RCP.Spec.Version)
}

// Return early if there are other pre-terminate hooks for the Machine.
// The CAPRKE2 pre-terminate hook should be the one executed last, so that kubelet
// is still working while other pre-terminate hooks are run.
// Note: This is done only for Kubernetes >= v1.31 to reduce the blast radius of this check.
if version.Compare(parsedVersion, semver.MustParse("1.31.0"), version.WithoutPreReleases()) >= 0 {
if machineHasOtherPreTerminateHooks(deletingMachine) {
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}
}

// Return early because the Machine controller is not yet waiting for the pre-terminate hook.
c := conditions.Get(deletingMachine, clusterv1.PreTerminateDeleteHookSucceededCondition)
if c == nil || c.Status != corev1.ConditionFalse || c.Reason != clusterv1.WaitingExternalHookReason {
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}

// The following will execute and remove the pre-terminate hook from the Machine.

// If we have more than 1 Machine and etcd is managed we forward etcd leadership and remove the member
alexander-demicev marked this conversation as resolved.
Show resolved Hide resolved
// to keep the etcd cluster healthy.
if controlPlane.Machines.Len() > 1 {
workloadCluster, err := r.GetWorkloadCluster(ctx, controlPlane)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err,
"failed to remove etcd member for deleting Machine %s: failed to create client to workload cluster", klog.KObj(deletingMachine))
}

// Note: In regular deletion cases (remediation, scale down) the leader should have been already moved.
// We're doing this again here in case the Machine became leader again or the Machine deletion was
// triggered in another way (e.g. a user running kubectl delete machine)
etcdLeaderCandidate := controlPlane.Machines.Filter(collections.Not(collections.HasDeletionTimestamp)).Newest()
if etcdLeaderCandidate != nil {
if err := workloadCluster.ForwardEtcdLeadership(ctx, deletingMachine, etcdLeaderCandidate); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to move leadership to candidate Machine %s", etcdLeaderCandidate.Name)
}
} else {
log.Info("Skip forwarding etcd leadership, because there is no other control plane Machine without a deletionTimestamp")
}

// Note: Removing the etcd member will lead to the etcd and the kube-apiserver Pod on the Machine shutting down.
if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, deletingMachine); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to remove etcd member for deleting Machine %s", klog.KObj(deletingMachine))
}
}

if err := r.removePreTerminateHookAnnotationFromMachine(ctx, deletingMachine); err != nil {
return ctrl.Result{}, err
}

log.Info("Waiting for Machines to be deleted", "machines",
strings.Join(controlPlane.Machines.Filter(collections.HasDeletionTimestamp).Names(), ", "))

return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}

func machineHasOtherPreTerminateHooks(machine *clusterv1.Machine) bool {
for k := range machine.Annotations {
if strings.HasPrefix(k, clusterv1.PreTerminateDeleteHookAnnotationPrefix) && k != controlplanev1.PreTerminateHookCleanupAnnotation {
return true
}
}

return false
}

// getWorkloadCluster gets a cluster object.
// The cluster comes with an etcd client generator to connect to any etcd pod living on a managed machine.
func (r *RKE2ControlPlaneReconciler) getWorkloadCluster(ctx context.Context, clusterKey types.NamespacedName) (rke2.WorkloadCluster, error) {
Expand Down
32 changes: 26 additions & 6 deletions controlplane/internal/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
Expand Down Expand Up @@ -159,11 +161,7 @@ func (r *RKE2ControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, err
}

if err := r.workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil {
logger.Error(err, "Failed to remove etcd member for machine")

return ctrl.Result{}, err
}
// NOTE: etcd member removal will be performed by the rke2-cleanup hook after machine completes drain & all volumes are detached.

logger = logger.WithValues("machine", machineToDelete)
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
Expand All @@ -178,6 +176,25 @@ func (r *RKE2ControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{Requeue: true}, nil
}

func (r *RKE2ControlPlaneReconciler) removePreTerminateHookAnnotationFromMachine(ctx context.Context, machine *clusterv1.Machine) error {
if _, exists := machine.Annotations[controlplanev1.PreTerminateHookCleanupAnnotation]; !exists {
// Nothing to do, the annotation is not set (anymore) on the Machine
return nil
}

log := ctrl.LoggerFrom(ctx)
log.Info("Removing pre-terminate hook from control plane Machine")

machineOriginal := machine.DeepCopy()
delete(machine.Annotations, controlplanev1.PreTerminateHookCleanupAnnotation)

if err := r.Client.Patch(ctx, machine, client.MergeFrom(machineOriginal)); err != nil {
return errors.Wrapf(err, "failed to remove pre-terminate hook from control plane Machine %s", klog.KObj(machine))
}

return nil
}

// preflightChecks checks if the control plane is stable before proceeding with a scale up/scale down operation,
// where stable means that:
// - There are no machine deletion in progress
Expand Down Expand Up @@ -447,7 +464,10 @@ func (r *RKE2ControlPlaneReconciler) generateMachine(
return errors.Wrap(err, "failed to marshal cluster configuration")
}

machine.SetAnnotations(map[string]string{controlplanev1.RKE2ServerConfigurationAnnotation: string(serverConfig)})
machine.SetAnnotations(map[string]string{
controlplanev1.RKE2ServerConfigurationAnnotation: string(serverConfig),
controlplanev1.PreTerminateHookCleanupAnnotation: "",
})

if err := r.Client.Create(ctx, machine); err != nil {
return errors.Wrap(err, "failed to create machine")
Expand Down
44 changes: 44 additions & 0 deletions pkg/rke2/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rke2

import (
"context"
"sort"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand Down Expand Up @@ -268,6 +269,23 @@ func (c *ControlPlane) HasDeletingMachine() bool {
return len(c.Machines.Filter(collections.HasDeletionTimestamp)) > 0
}

// DeletingMachines returns machines in the control plane that are in the process of being deleted.
func (c *ControlPlane) DeletingMachines() collections.Machines {
return c.Machines.Filter(collections.HasDeletionTimestamp)
}

// SortedByDeletionTimestamp returns the machines sorted by deletion timestamp.
func (c *ControlPlane) SortedByDeletionTimestamp(s collections.Machines) []*clusterv1.Machine {
res := make(machinesByDeletionTimestamp, 0, len(s))
for _, value := range s {
res = append(res, value)
}

sort.Sort(res)

return res
}

// MachinesNeedingRollout return a list of machines that need to be rolled out.
func (c *ControlPlane) MachinesNeedingRollout() collections.Machines {
// Ignore machines to be deleted.
Expand Down Expand Up @@ -383,3 +401,29 @@ func (c *ControlPlane) PatchMachines(ctx context.Context) error {

return kerrors.NewAggregate(errList)
}

// machinesByDeletionTimestamp sorts a list of Machines by deletion timestamp, using their names as a tie breaker.
// Machines without DeletionTimestamp go after machines with this field set.
type machinesByDeletionTimestamp []*clusterv1.Machine

func (o machinesByDeletionTimestamp) Len() int { return len(o) }
func (o machinesByDeletionTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o machinesByDeletionTimestamp) Less(i, j int) bool {
if o[i].DeletionTimestamp == nil && o[j].DeletionTimestamp == nil {
return o[i].Name < o[j].Name
}

if o[i].DeletionTimestamp == nil {
return false
}

if o[j].DeletionTimestamp == nil {
return true
}

if o[i].DeletionTimestamp.Equal(o[j].DeletionTimestamp) {
return o[i].Name < o[j].Name
}

return o[i].DeletionTimestamp.Before(o[j].DeletionTimestamp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ spec:
cni: calico
disableComponents:
kubernetesComponents: [ "cloudController"]
nodeDrainTimeout: 2m
rolloutStrategy:
type: "RollingUpdate"
rollingUpdate:
Expand Down