Skip to content

Commit 210f854

Browse files
authored
Merge pull request #139 from yiwenshao/add-preemption
implement preemption for galaxy-ipam
2 parents dbc3522 + 6a45cc2 commit 210f854

File tree

8 files changed

+153
-5
lines changed

8 files changed

+153
-5
lines changed

pkg/api/k8s/schedulerapi/schedulerapi.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,43 @@ type LabelPreference struct {
9898
Presence bool `json:"presence"`
9999
}
100100

101+
// ExtenderPreemptionResult represents the result returned by preemption phase of extender.
102+
type ExtenderPreemptionResult struct {
103+
NodeNameToMetaVictims map[string]*MetaVictims
104+
}
105+
106+
// ExtenderPreemptionArgs represents the arguments needed by the extender to preempt pods on nodes.
107+
type ExtenderPreemptionArgs struct {
108+
// Pod being scheduled
109+
Pod *corev1.Pod
110+
// Victims map generated by scheduler preemption phase
111+
// Only set NodeNameToMetaVictims if Extender.NodeCacheCapable == true. Otherwise, only set NodeNameToVictims.
112+
NodeNameToVictims map[string]*Victims
113+
NodeNameToMetaVictims map[string]*MetaVictims
114+
}
115+
116+
// Victims represents:
117+
// pods: a group of pods expected to be preempted.
118+
// numPDBViolations: the count of violations of PodDisruptionBudget
119+
type Victims struct {
120+
Pods []*corev1.Pod
121+
NumPDBViolations int64
122+
}
123+
124+
// MetaPod represent identifier for a v1.Pod
125+
type MetaPod struct {
126+
UID string
127+
}
128+
129+
// MetaVictims represents:
130+
// pods: a group of pods expected to be preempted.
131+
// Only Pod identifiers will be sent and user are expect to get v1.Pod in their own way.
132+
// numPDBViolations: the count of violations of PodDisruptionBudget
133+
type MetaVictims struct {
134+
Pods []*MetaPod
135+
NumPDBViolations int64
136+
}
137+
101138
// ExtenderArgs represents the arguments needed by the extender to filter/prioritize
102139
// nodes for a pod.
103140
type ExtenderArgs struct {

pkg/ipam/context/context.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,15 @@ type IPAMContext struct {
4242
DynamicClient dynamic.Interface
4343

4444
PodLister corev1lister.PodLister
45+
NodeLister corev1lister.NodeLister
4546
StatefulSetLister appv1.StatefulSetLister
4647
DeploymentLister appv1.DeploymentLister
4748
PoolLister list.PoolLister
4849
ExtensionLister extensionlister.CustomResourceDefinitionLister
4950

50-
PodInformer coreinformer.PodInformer
51-
FIPInformer galaxyinformer.FloatingIPInformer
51+
PodInformer coreinformer.PodInformer
52+
NodeInformer coreinformer.NodeInformer
53+
FIPInformer galaxyinformer.FloatingIPInformer
5254

5355
informerFactory informers.SharedInformerFactory
5456
crdInformerFactory crdInformer.SharedInformerFactory
@@ -66,6 +68,7 @@ func NewIPAMContext(client kubernetes.Interface, galaxyClient crd_clientset.Inte
6668
}
6769
ctx.informerFactory = informers.NewSharedInformerFactoryWithOptions(ctx.Client, time.Minute)
6870
ctx.PodInformer = ctx.informerFactory.Core().V1().Pods()
71+
ctx.NodeInformer = ctx.informerFactory.Core().V1().Nodes()
6972
statefulsetInformer := ctx.informerFactory.Apps().V1().StatefulSets()
7073
deploymentInformer := ctx.informerFactory.Apps().V1().Deployments()
7174
ctx.crdInformerFactory = crdInformer.NewSharedInformerFactory(ctx.GalaxyClient, 0)
@@ -76,6 +79,7 @@ func NewIPAMContext(client kubernetes.Interface, galaxyClient crd_clientset.Inte
7679
extensionInformer.Informer() // call Informer to actually create an informer
7780

7881
ctx.PodLister = ctx.PodInformer.Lister()
82+
ctx.NodeLister = ctx.NodeInformer.Lister()
7983
ctx.StatefulSetLister = statefulsetInformer.Lister()
8084
ctx.DeploymentLister = deploymentInformer.Lister()
8185
ctx.PoolLister = poolInformer.Lister()

pkg/ipam/floatingip/ipam_crd.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,6 @@ func (ci *crdIpam) ConfigurePool(floatIPs []*FloatingIPPool) error {
338338
glog.Infof("Configure pool done, %d fip pool, %d unallocated, %d allocated", len(ci.FloatingIPs),
339339
len(ci.unallocatedFIPs), len(ci.allocatedFIPs))
340340
}()
341-
342341
sort.Sort(FloatingIPSlice(floatIPs))
343342
ips, err := ci.listFloatingIPs()
344343
if err != nil {

pkg/ipam/schedulerplugin/ipam.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (p *FloatingIPPlugin) getNodeSubnetfromIPAM(node *corev1.Node) (*net.IPNet,
161161
return nil, errors.New("FloatingIPPlugin:UnknowNode")
162162
}
163163
if ipNet := p.ipam.NodeSubnet(nodeIP); ipNet != nil {
164-
glog.V(4).Infof("node %s %s %s", node.Name, nodeIP.String(), ipNet.String())
164+
glog.V(4).Infof("node subnet cache %s %s %s", node.Name, nodeIP.String(), ipNet.String())
165165
p.nodeSubnet[node.Name] = ipNet
166166
return ipNet, nil
167167
} else {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package schedulerplugin
2+
3+
import (
4+
glog "k8s.io/klog"
5+
"tkestack.io/galaxy/pkg/api/galaxy/constant"
6+
"tkestack.io/galaxy/pkg/api/k8s/schedulerapi"
7+
)
8+
9+
func fillNodeNameToMetaVictims(args *schedulerapi.ExtenderPreemptionArgs) {
10+
if len(args.NodeNameToVictims) != 0 && len(args.NodeNameToMetaVictims) == 0 {
11+
args.NodeNameToMetaVictims = map[string]*schedulerapi.MetaVictims{}
12+
for node, victim := range args.NodeNameToVictims {
13+
metaVictim := &schedulerapi.MetaVictims{
14+
Pods: []*schedulerapi.MetaPod{},
15+
NumPDBViolations: victim.NumPDBViolations,
16+
}
17+
for _, pod := range victim.Pods {
18+
metaPod := &schedulerapi.MetaPod{
19+
UID: string(pod.UID),
20+
}
21+
metaVictim.Pods = append(metaVictim.Pods, metaPod)
22+
}
23+
args.NodeNameToMetaVictims[node] = metaVictim
24+
}
25+
}
26+
}
27+
28+
func (p *FloatingIPPlugin) Preempt(args *schedulerapi.ExtenderPreemptionArgs) map[string]*schedulerapi.MetaVictims {
29+
fillNodeNameToMetaVictims(args)
30+
policy := parseReleasePolicy(&args.Pod.ObjectMeta)
31+
if policy == constant.ReleasePolicyPodDelete {
32+
return args.NodeNameToMetaVictims
33+
}
34+
subnetSet, err := p.getSubnet(args.Pod)
35+
if err != nil {
36+
glog.Errorf("unable to get pod subnets: %v", err)
37+
return args.NodeNameToMetaVictims
38+
}
39+
glog.V(4).Infof("subnet for pod %v is %v", args.Pod.Name, subnetSet)
40+
for nodeName := range args.NodeNameToMetaVictims {
41+
node, err := p.NodeLister.Get(nodeName)
42+
if err != nil {
43+
glog.Errorf("unable to list node: %v", err)
44+
delete(args.NodeNameToMetaVictims, nodeName)
45+
continue
46+
}
47+
subnet, err := p.getNodeSubnet(node)
48+
if err != nil {
49+
glog.Errorf("unable to get node %v subnet: %v", nodeName, err)
50+
delete(args.NodeNameToMetaVictims, nodeName)
51+
continue
52+
}
53+
if !subnetSet.Has(subnet.String()) {
54+
glog.V(4).Infof("remove node %v with subnet %v from victim", node.Name, subnet.String())
55+
delete(args.NodeNameToMetaVictims, nodeName)
56+
}
57+
}
58+
return args.NodeNameToMetaVictims
59+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package schedulerplugin
2+
3+
import (
4+
"gotest.tools/assert"
5+
corev1 "k8s.io/api/core/v1"
6+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7+
"testing"
8+
"tkestack.io/galaxy/pkg/api/k8s/schedulerapi"
9+
)
10+
11+
func TestFillNodeNameToMetaVictims(t *testing.T) {
12+
args := &schedulerapi.ExtenderPreemptionArgs{
13+
NodeNameToVictims: map[string]*schedulerapi.Victims{
14+
"nod1": {
15+
Pods: []*corev1.Pod{
16+
{
17+
ObjectMeta: metav1.ObjectMeta{
18+
UID: "id1",
19+
},
20+
},
21+
},
22+
},
23+
},
24+
}
25+
fillNodeNameToMetaVictims(args)
26+
assert.Equal(t, len(args.NodeNameToMetaVictims), 1)
27+
}

pkg/ipam/server/server.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ func (s *Server) startServer() {
220220
Writes(schedulerapi.HostPriorityList{}))
221221
ws.Route(ws.POST("/bind").To(s.bind).Reads(schedulerapi.ExtenderBindingArgs{}).
222222
Writes(schedulerapi.ExtenderBindingResult{}))
223+
ws.Route(ws.POST("/preempt").To(s.preempt).Reads(schedulerapi.ExtenderPreemptionArgs{}))
223224
health := new(restful.WebService)
224225
health.Route(health.GET("/healthy").To(s.healthy))
225226
container := restful.NewContainer()
@@ -385,6 +386,26 @@ func (s *Server) bind(request *restful.Request, response *restful.Response) {
385386
_ = response.WriteEntity(result)
386387
}
387388

389+
func (s *Server) preempt(request *restful.Request, response *restful.Response) {
390+
args := new(schedulerapi.ExtenderPreemptionArgs)
391+
if err := request.ReadEntity(&args); err != nil {
392+
glog.Error(err)
393+
_ = response.WriteError(http.StatusInternalServerError, err)
394+
return
395+
}
396+
glog.V(5).Infof("POST preempt %v+", *args)
397+
start := time.Now()
398+
glog.V(3).Infof("preempt for pod %v/%v with %v/%v victims start at %d+",
399+
args.Pod.Namespace, args.Pod.Name, len(args.NodeNameToMetaVictims), len(args.NodeNameToVictims), start.Nanosecond())
400+
nodeNameToMetaVictims := s.plugin.Preempt(args)
401+
glog.V(3).Infof("preempt for pod %v/%v with %v/%v victims start at %d-",
402+
args.Pod.Namespace, args.Pod.Name, len(args.NodeNameToMetaVictims), len(args.NodeNameToVictims), start.Nanosecond())
403+
glog.V(5).Infof("POST preempt %v-", *args)
404+
_ = response.WriteEntity(schedulerapi.ExtenderPreemptionResult{
405+
NodeNameToMetaVictims: nodeNameToMetaVictims,
406+
})
407+
}
408+
388409
func (s *Server) healthy(request *restful.Request, response *restful.Response) {
389410
response.WriteHeader(http.StatusOK)
390411
_, _ = response.Write([]byte("ok"))

yaml/scheduler-policy.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ data:
1717
"httpTimeout": 70000000000,
1818
"filterVerb": "filter",
1919
"BindVerb": "bind",
20+
"preemptVerb": "preempt",
2021
"weight": 1,
2122
"enableHttps": false,
2223
"managedResources": [
@@ -27,4 +28,4 @@ data:
2728
]
2829
}
2930
]
30-
}
31+
}

0 commit comments

Comments
 (0)