Skip to content

Commit

Permalink
Fix blocking by kubelet-restart op
Browse files Browse the repository at this point in the history
  • Loading branch information
zoetrope committed Sep 7, 2023
1 parent c798e29 commit 60fcbc5
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 45 deletions.
89 changes: 49 additions & 40 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,35 +105,32 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain

func riversOps(c *cke.Cluster, nf *NodeFilter, maxConcurrentUpdates int) (ops []cke.Operator) {
if nodes := nf.SSHConnectedNodes(nf.RiversStoppedNodes(), true, true); len(nodes) > 0 {
ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator {
return op.RiversBootOp(ns, nf.ControlPlane(), c.Options.Rivers, op.RiversContainerName, op.RiversUpstreamPort, op.RiversListenPort)
}, maxConcurrentUpdates)...)
max := maxConcurrentUpdates
if len(nodes) < max {
max = len(nodes)
}
ops = append(ops, op.RiversBootOp(nodes[:max], nf.ControlPlane(), c.Options.Rivers, op.RiversContainerName, op.RiversUpstreamPort, op.RiversListenPort))
}
if nodes := nf.SSHConnectedNodes(nf.RiversOutdatedNodes(), true, true); len(nodes) > 0 {
ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator {
return op.RiversRestartOp(ns, nf.ControlPlane(), c.Options.Rivers, op.RiversContainerName, op.RiversUpstreamPort, op.RiversListenPort)
}, maxConcurrentUpdates)...)
max := maxConcurrentUpdates
if len(nodes) < max {
max = len(nodes)
}
ops = append(ops, op.RiversRestartOp(nodes[:max], nf.ControlPlane(), c.Options.Rivers, op.RiversContainerName, op.RiversUpstreamPort, op.RiversListenPort))
}
if nodes := nf.SSHConnectedNodes(nf.EtcdRiversStoppedNodes(), true, false); len(nodes) > 0 {
ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator {
return op.RiversBootOp(ns, nf.ControlPlane(), c.Options.EtcdRivers, op.EtcdRiversContainerName, op.EtcdRiversUpstreamPort, op.EtcdRiversListenPort)
}, maxConcurrentUpdates)...)
max := maxConcurrentUpdates
if len(nodes) < max {
max = len(nodes)
}
ops = append(ops, op.RiversBootOp(nodes[:max], nf.ControlPlane(), c.Options.EtcdRivers, op.EtcdRiversContainerName, op.EtcdRiversUpstreamPort, op.EtcdRiversListenPort))
}
if nodes := nf.SSHConnectedNodes(nf.EtcdRiversOutdatedNodes(), true, false); len(nodes) > 0 {
ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator {
return op.RiversRestartOp(ns, nf.ControlPlane(), c.Options.EtcdRivers, op.EtcdRiversContainerName, op.EtcdRiversUpstreamPort, op.EtcdRiversListenPort)
}, maxConcurrentUpdates)...)
}
return ops
}

func splitOperators(nodes []*cke.Node, createOps func(ns []*cke.Node) cke.Operator, maxConcurrentUpdates int) (ops []cke.Operator) {
for i := 0; i < len(nodes); i += maxConcurrentUpdates {
end := i + maxConcurrentUpdates
if end > len(nodes) {
end = len(nodes)
max := maxConcurrentUpdates
if len(nodes) < max {
max = len(nodes)
}
ops = append(ops, createOps(nodes[i:end]))
ops = append(ops, op.RiversRestartOp(nodes[:max], nf.ControlPlane(), c.Options.EtcdRivers, op.EtcdRiversContainerName, op.EtcdRiversUpstreamPort, op.EtcdRiversListenPort))
}
return ops
}
Expand Down Expand Up @@ -164,34 +161,46 @@ func k8sOps(c *cke.Cluster, nf *NodeFilter, cs *cke.ClusterStatus, maxConcurrent
// For all nodes
apiServer := nf.HealthyAPIServer()
if nodes := nf.SSHConnectedNodes(nf.KubeletUnrecognizedNodes(), true, true); len(nodes) > 0 {
ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator {
return k8s.KubeletRestartOp(ns, c.Name, c.Options.Kubelet, cs.NodeStatuses)
}, maxConcurrentUpdates)...)
max := maxConcurrentUpdates
if len(nodes) < max {
max = len(nodes)
}
ops = append(ops, k8s.KubeletRestartOp(nodes[:max], c.Name, c.Options.Kubelet, cs.NodeStatuses))
}
if nodes := nf.SSHConnectedNodes(nf.KubeletStoppedNodes(), true, true); len(nodes) > 0 {
ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator {
return k8s.KubeletBootOp(ns, nf.KubeletStoppedRegisteredNodes(), apiServer, c.Name, c.Options.Kubelet, cs.NodeStatuses)
}, maxConcurrentUpdates)...)
max := maxConcurrentUpdates
if len(nodes) < max {
max = len(nodes)
}
ops = append(ops, k8s.KubeletBootOp(nodes[:max], nf.KubeletStoppedRegisteredNodes(), apiServer, c.Name, c.Options.Kubelet, cs.NodeStatuses))
}
if nodes := nf.SSHConnectedNodes(nf.KubeletOutdatedNodes(), true, true); len(nodes) > 0 {
ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator {
return k8s.KubeletRestartOp(ns, c.Name, c.Options.Kubelet, cs.NodeStatuses)
}, maxConcurrentUpdates)...)
max := maxConcurrentUpdates
if len(nodes) < max {
max = len(nodes)
}
ops = append(ops, k8s.KubeletRestartOp(nodes[:max], c.Name, c.Options.Kubelet, cs.NodeStatuses))
}
if nodes := nf.SSHConnectedNodes(nf.ProxyStoppedNodes(), true, true); len(nodes) > 0 {
ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator {
return k8s.KubeProxyBootOp(ns, c.Name, "", c.Options.Proxy)
}, maxConcurrentUpdates)...)
max := maxConcurrentUpdates
if len(nodes) < max {
max = len(nodes)
}
ops = append(ops, k8s.KubeProxyBootOp(nodes[:max], c.Name, "", c.Options.Proxy))
}
if nodes := nf.SSHConnectedNodes(nf.ProxyOutdatedNodes(c.Options.Proxy), true, true); len(nodes) > 0 {
ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator {
return k8s.KubeProxyRestartOp(ns, c.Name, "", c.Options.Proxy)
}, maxConcurrentUpdates)...)
max := maxConcurrentUpdates
if len(nodes) < max {
max = len(nodes)
}
ops = append(ops, k8s.KubeProxyRestartOp(nodes[:max], c.Name, "", c.Options.Proxy))
}
if nodes := nf.SSHConnectedNodes(nf.ProxyRunningUnexpectedlyNodes(), true, true); len(nodes) > 0 {
ops = append(ops, splitOperators(nodes, func(ns []*cke.Node) cke.Operator {
return op.ProxyStopOp(ns)
}, maxConcurrentUpdates)...)
max := maxConcurrentUpdates
if len(nodes) < max {
max = len(nodes)
}
ops = append(ops, op.ProxyStopOp(nodes[:max]))
}
return ops
}
Expand Down
6 changes: 1 addition & 5 deletions server/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func TestDecideOps(t *testing.T) {
{
Name: "BootRivers",
Input: newData(),
ExpectedOps: []opData{{"rivers-bootstrap", 5}, {"rivers-bootstrap", 1}, {"etcd-rivers-bootstrap", 3}},
ExpectedOps: []opData{{"rivers-bootstrap", 5}, {"etcd-rivers-bootstrap", 3}},
},
{
Name: "BootRivers2",
Expand Down Expand Up @@ -749,9 +749,7 @@ func TestDecideOps(t *testing.T) {
{"kube-controller-manager-bootstrap", 3},
{"kube-scheduler-bootstrap", 3},
{"kubelet-bootstrap", 5},
{"kubelet-bootstrap", 1},
{"kube-proxy-bootstrap", 5},
{"kube-proxy-bootstrap", 1},
},
},
{
Expand Down Expand Up @@ -983,7 +981,6 @@ func TestDecideOps(t *testing.T) {
Input: newData().withAllServices().withDisableProxy(),
ExpectedOps: []opData{
{"stop-kube-proxy", 5},
{"stop-kube-proxy", 1},
},
},
{
Expand Down Expand Up @@ -1032,7 +1029,6 @@ func TestDecideOps(t *testing.T) {
ExpectedOps: []opData{
{"kube-apiserver-restart", 3},
{"kubelet-restart", 5},
{"kubelet-restart", 1},
},
},
{
Expand Down

0 comments on commit 60fcbc5

Please sign in to comment.