Skip to content
This repository has been archived by the owner on Sep 15, 2023. It is now read-only.

Commit

Permalink
cross version upgration support like: v1.20 to v1.24 (#73)
Browse files Browse the repository at this point in the history
* kubelet restart depends on kubelet-reloader, we need to run it after upgrade-kubeadm

* add support for cross version upgrade like v1.21.0 to v1.24.0

* add master toleration for v1.22 and before

* fix: no need to upgrade to current minor version

* UT: test cross version check

* fix cross task order by rename <operaton-name>-<next-verison>-<0x>-<taskname>
  • Loading branch information
pacoxu authored Jun 16, 2022
1 parent 84673c7 commit 9721e44
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 51 deletions.
5 changes: 5 additions & 0 deletions controllers/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func createDaemonSet(c client.Client, operation *operatorv1.Operation, namespace
Key: "node-role.kubernetes.io/control-plane",
Effect: corev1.TaintEffectNoSchedule,
},
// to work for v1.22-
{
Key: "node-role.kubernetes.io/master",
Effect: corev1.TaintEffectNoSchedule,
},
},
Containers: []corev1.Container{
{
Expand Down
151 changes: 100 additions & 51 deletions operations/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,92 +38,143 @@ func setupUpgrade() map[string]string {

func planUpgrade(operation *operatorv1.Operation, spec *operatorv1.UpgradeOperationSpec, c client.Client) *operatorv1.RuntimeTaskGroupList {
log := ctrl.Log.WithName("operations").WithName("Upgrade").WithValues("task", operation.Name)

// TODO support upgrade to v1.n-1~v1.n of current kubernetes server version.
// If the current kubernetes server version is v1.n-2 which is below the target version, we need to generate a further upgrade plan

var items []operatorv1.RuntimeTaskGroup
dryRun := operation.Spec.GetTypedOperationExecutionMode() == operatorv1.OperationExecutionModeDryRun
serverNeedUpgrade := true

serverVersion, err := getServerVersion()
if err != nil {
log.Error(err, "get server version failed")
return nil
}
isServerSupported, isServerCrossVersion, isServerCanSkip := upgradeCheck(serverVersion, spec.KubernetesVersion)
if !isServerSupported {
log.Info("Upgrade is not supported", "serverVersion", serverVersion, "kubernetesVersion", spec.KubernetesVersion)
// TODO current not supported operation will succeed immeditely.
return nil
}
log.Info("Upgrade is supported", "serverVersion", serverVersion, "kubernetesVersion", spec.KubernetesVersion)
nodes, err := listNodesBySelector(c, getAllSelector())
if err != nil {
log.Error(err, "list node failed")
return nil
}
var isClientSupported, isClientCrossVersion, isClientCanSkip bool = true, false, true
var clientServerMatch bool = true
log.Info("nodes list", "nodes", len(nodes.Items))
for _, n := range nodes.Items {
supported, cross, skip := upgradeCheck(n.Status.NodeInfo.KubeletVersion, spec.KubernetesVersion)
isClientSupported = isClientSupported && supported
isClientCrossVersion = isClientCrossVersion || cross
isClientCanSkip = isClientCanSkip && skip
clientServerMatch = clientServerMatch && n.Status.NodeInfo.KubeletVersion == serverVersion
if n.Status.NodeInfo.KubeletVersion != serverVersion {
log.Info("node is not match server version", "node", n.Name, "serverVersion", serverVersion, "kubeletVersion", n.Status.NodeInfo.KubeletVersion)
}
}
if !isClientSupported {
log.Info("Upgrade is not supported", "clientVersion", spec.KubernetesVersion)
return nil
}
// TODO is it the right way to check if the `kubeadm upgrade apply` is successful?
// check ClusterConfiguration.kubernetesVersion in kubeadm-config configmap?
if operation.Spec.Upgrade.KubernetesVersion == serverVersion {
//skip upgrade if the current kubernetes server version is the same as the target version
serverNeedUpgrade = false
}

// nodeNeedUpgrade := true
// nodeVersion, err := getNodeVersion(c, getNodeName())
// if err != nil {
// return nil
// }
// if operation.Spec.Upgrade.KubernetesVersion == nodeVersion {
// //skip upgrade node if the current kubernetes server version is the same as the target version
// nodeNeedUpgrade = false
// }

if serverNeedUpgrade {
t1 := createUpgradeApplyTaskGroup(operation, "01", "upgrade-apply")
log.Info("show all client version check results", "isClientSupported", isClientSupported, "isClientCrossVersion", isClientCrossVersion, "isClientCanSkip", isClientCanSkip, "clientServerMatch", clientServerMatch)
log.Info("show all server version check results", "isServerSupported", isServerSupported, "isServerCrossVersion", isServerCrossVersion, "isServerCanSkip", isServerCanSkip)
if isClientCanSkip && isServerCanSkip {
// skip upgrade directly
return &operatorv1.RuntimeTaskGroupList{
Items: items,
}
} else if isClientCrossVersion || isServerCrossVersion {
// support upgrade to v1.n-1~v1.n of current kubernetes server version.
// If the current kubernetes server version is v1.n-2 which is below the target version, we need to generate a further upgrade plan
log.Info("Upgrade is not supported, need cross version for client or server", "targetVersion", spec.KubernetesVersion, "serverVersion", serverVersion)
if !clientServerMatch {
// upgrade nodes to the target version
log.Info("[cross-upgrade] add items to make server client match", "serverVersion", serverVersion)
items = append(items, planNextUpgrade(operation, serverVersion, c, true)...)
}
crossVersions := getCrossVersions(serverVersion, spec.KubernetesVersion)
for _, v := range crossVersions {
log.Info("[cross-upgrade] add items to upgrade to a middle version", "version", v)
items = append(items, planNextUpgrade(operation, v, c, false)...)
}
log.Info("[cross-upgrade] add items to upgrade to the target version", "version", spec.KubernetesVersion)
items = append(items, planNextUpgrade(operation, operation.Spec.Upgrade.KubernetesVersion, c, false)...)

} else {
log.Info("add items to upgrade to the target version", "version", spec.KubernetesVersion)
items = append(items, planNextUpgrade(operation, operation.Spec.Upgrade.KubernetesVersion, c, isServerCanSkip)...)
}

return &operatorv1.RuntimeTaskGroupList{
Items: items,
}
}

// the version may not be operation.Spec.Upgrade.KubernetesVersion for cross upgrade
func planNextUpgrade(operation *operatorv1.Operation, version string, c client.Client, isServerCanSkip bool) []operatorv1.RuntimeTaskGroup {
log := ctrl.Log.WithName("operations").WithName("Upgrade").WithValues("task", operation.Name)
log.Info("add task for upgrading", "version", version, "isServerCanSkip", isServerCanSkip)

var items []operatorv1.RuntimeTaskGroup
dryRun := operation.Spec.GetTypedOperationExecutionMode() == operatorv1.OperationExecutionModeDryRun

if !isServerCanSkip {
t1 := createUpgradeApplyTaskGroup(operation, fmt.Sprintf("%s-01", version), "upgrade-apply")
setCP1Selector(&t1)
// run `upgrade apply`` on the first node of all control plane
t1.Spec.NodeFilter = string(operatorv1.RuntimeTaskGroupNodeFilterHead)
t1.Spec.Template.Spec.Commands = append(t1.Spec.Template.Spec.Commands,
operatorv1.CommandDescriptor{
UpgradeKubeadm: &operatorv1.UpgradeKubeadmCommandSpec{
KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion,
Local: operation.Spec.Upgrade.Local,
},
},
operatorv1.CommandDescriptor{
UpgradeKubeletAndKubeactl: &operatorv1.UpgradeKubeletAndKubeactlCommandSpec{
KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion,
KubernetesVersion: version,
Local: operation.Spec.Upgrade.Local,
},
},
operatorv1.CommandDescriptor{
KubeadmUpgradeApply: &operatorv1.KubeadmUpgradeApplyCommandSpec{
DryRun: dryRun,
KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion,
KubernetesVersion: version,
SkipKubeProxy: operation.Spec.Upgrade.UpgradeKubeProxyAtLast,
},
},
// as it depends on kubelet-reloader, we need to run it after upgrade-kubeadm apply
operatorv1.CommandDescriptor{
UpgradeKubeletAndKubeactl: &operatorv1.UpgradeKubeletAndKubeactlCommandSpec{
KubernetesVersion: version,
Local: operation.Spec.Upgrade.Local,
},
},
)
log.Info("add upgrade-apply task group", "task", t1.Name)
items = append(items, t1)
}

// this can be skipped if there is only one control-plane node.
// currently it depends on the selector
t2 := createBasicTaskGroup(operation, "02", "upgrade-cp")
t2 := createBasicTaskGroup(operation, fmt.Sprintf("%s-02", version), "upgrade-cp")
setCPSelector(&t2)
cpNodes, err := listNodesBySelector(c, &t2.Spec.NodeSelector)
if err != nil {
log.Info("failed to list nodes.", "error", err)
return nil
return items
}
if len(cpNodes.Items) > 1 {

t2.Spec.Template.Spec.Commands = append(t2.Spec.Template.Spec.Commands,
operatorv1.CommandDescriptor{
UpgradeKubeadm: &operatorv1.UpgradeKubeadmCommandSpec{
KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion,
KubernetesVersion: version,
Local: operation.Spec.Upgrade.Local,
},
},
operatorv1.CommandDescriptor{
UpgradeKubeletAndKubeactl: &operatorv1.UpgradeKubeletAndKubeactlCommandSpec{
KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion,
Local: operation.Spec.Upgrade.Local,
KubeadmUpgradeNode: &operatorv1.KubeadmUpgradeNodeCommandSpec{
DryRun: operatorv1.OperationExecutionMode(operation.Spec.ExecutionMode) == operatorv1.OperationExecutionModeDryRun,
},
},
// as it depends on kubelet-reloader, we need to run it after upgrade-kubeadm
operatorv1.CommandDescriptor{
KubeadmUpgradeNode: &operatorv1.KubeadmUpgradeNodeCommandSpec{
DryRun: operatorv1.OperationExecutionMode(operation.Spec.ExecutionMode) == operatorv1.OperationExecutionModeDryRun,
UpgradeKubeletAndKubeactl: &operatorv1.UpgradeKubeletAndKubeactlCommandSpec{
KubernetesVersion: version,
Local: operation.Spec.Upgrade.Local,
},
},
)
Expand All @@ -132,11 +183,11 @@ func planUpgrade(operation *operatorv1.Operation, spec *operatorv1.UpgradeOperat
}

if operation.Spec.Upgrade.UpgradeKubeProxyAtLast {
t3 := createBasicTaskGroup(operation, "03", "upgrade-kube-proxy")
t3 := createBasicTaskGroup(operation, fmt.Sprintf("%s-03", version), "upgrade-kube-proxy")
t3.Spec.Template.Spec.Commands = append(t3.Spec.Template.Spec.Commands,
operatorv1.CommandDescriptor{
KubeadmUpgradeKubeProxy: &operatorv1.KubeadmUpgradeKubeProxySpec{
KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion,
KubernetesVersion: version,
},
},
)
Expand All @@ -146,27 +197,28 @@ func planUpgrade(operation *operatorv1.Operation, spec *operatorv1.UpgradeOperat

// this can be skipped if there are no worker nodes.
// currently it depends on the selector
t4 := createBasicTaskGroup(operation, "04", "upgrade-w")
t4 := createBasicTaskGroup(operation, fmt.Sprintf("%s-04", version), "upgrade-worker")
setWSelector(&t4)
workerNodes, err := listNodesBySelector(c, &t4.Spec.NodeSelector)
if err != nil {
fmt.Printf("failed to list nodes: %v", err)
return nil
return items
}
log.Info("workerNodes check", "workerNum", len(workerNodes.Items))
if len(workerNodes.Items) > 0 {
t4.Spec.Template.Spec.Commands = append(t4.Spec.Template.Spec.Commands,
operatorv1.CommandDescriptor{
KubectlDrain: &operatorv1.KubectlDrainCommandSpec{},
},
operatorv1.CommandDescriptor{
UpgradeKubeadm: &operatorv1.UpgradeKubeadmCommandSpec{
KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion,
KubernetesVersion: version,
Local: operation.Spec.Upgrade.Local,
},
},
operatorv1.CommandDescriptor{
UpgradeKubeletAndKubeactl: &operatorv1.UpgradeKubeletAndKubeactlCommandSpec{
KubernetesVersion: operation.Spec.Upgrade.KubernetesVersion,
KubernetesVersion: version,
Local: operation.Spec.Upgrade.Local,
},
},
Expand All @@ -182,10 +234,7 @@ func planUpgrade(operation *operatorv1.Operation, spec *operatorv1.UpgradeOperat
log.Info("add upgrade-w task group", "task", t4.Name)
items = append(items, t4)
}

return &operatorv1.RuntimeTaskGroupList{
Items: items,
}
return items
}

// check the current kubernetes server version
Expand Down
8 changes: 8 additions & 0 deletions operations/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ func setWSelector(t *operatorv1.RuntimeTaskGroup) {
}
}

func getAllSelector() *metav1.LabelSelector {
return &metav1.LabelSelector{
MatchLabels: map[string]string{
"kubernetes.io/os": "linux",
},
}
}

func fixupCustomTaskGroup(operation *operatorv1.Operation, taskgroup operatorv1.RuntimeTaskGroup, taskdeploymentOrder string) operatorv1.RuntimeTaskGroup {
gv := operatorv1.GroupVersion

Expand Down
103 changes: 103 additions & 0 deletions operations/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operations

import (
"fmt"

"k8s.io/apimachinery/pkg/util/version"
)

func upgradeCheck(current, target string) (isSupported, isCrossVersion, canSkip bool) {
currentVer, err := version.ParseSemantic(current)
if err != nil {
return false, false, false
}
targetVer, err := version.ParseSemantic(target)
if err != nil {
return false, false, false
}
return upgradeCheckVersion(currentVer, targetVer)
}

func upgradeCheckVersion(current, target *version.Version) (isSupported, isCrossVersion, canSkip bool) {
// no need to check major as only major 1 is supported
// if current.Major() != target.Major() {
// return false
// }
if current.Minor() == target.Minor() {
if current.Patch() == target.Patch() {
// just skip as the version is the same
return true, false, true
}
// upgrade to a patched version
return true, false, false
} else if current.Minor() < target.Minor() {
if current.Minor()+1 == target.Minor() {
// upgrade to a minor version
return true, false, false
}
// upgrade multi-minor version, need to split into multiple upgrade tasks
return true, true, false
}
// downgrade is not supported
if current.Minor()-1 == target.Minor() {
// TODO downgrade to a minor version
// this is just for test purpose, need to define if it should be supported in the future
return true, false, false
}
return false, false, false

}

func isSupported(ver string) bool {
v, err := version.ParseSemantic(ver)
if err != nil {
return false
}
return isSupportedVersion(v)
}

func isSupportedVersion(ver *version.Version) bool {
// TODO a table of supported versions needs to be created in the docs
if ver.Major() != 1 && ver.Minor() < 17 {
return false
}
return true
}

// before this, we should make sure the version is supported
func getCrossVersions(current, target string) []string {
versions := []string{}
cur, err := version.ParseSemantic(current)
if err != nil {
return versions
}
tar, err := version.ParseSemantic(target)
if err != nil {
return versions
}
_, isCross, _ := upgradeCheckVersion(cur, tar)
if !isCross {
return versions
}
tarMinor := tar.Minor()
for i := cur.Minor() + 1; i < tarMinor; i++ {
versions = append(versions, fmt.Sprintf("v1.%d.0", i))
}
return versions
}
Loading

0 comments on commit 9721e44

Please sign in to comment.