Skip to content

Commit 0a9a52c

Browse files
BTMa-ucasmabotao
andauthored
Adding reschedule function in scheduler.go (#198)
Signed-off-by: tinyma123 <[email protected]> Co-authored-by: mabotao <[email protected]>
1 parent 6d7f661 commit 0a9a52c

File tree

1 file changed

+214
-3
lines changed

1 file changed

+214
-3
lines changed

pkg/scheduler/scheduler.go

Lines changed: 214 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ package scheduler
33
import (
44
"context"
55
"encoding/json"
6+
"fmt"
67
"time"
78

89
v1 "k8s.io/api/core/v1"
910
"k8s.io/apimachinery/pkg/api/equality"
1011
"k8s.io/apimachinery/pkg/api/errors"
12+
"k8s.io/apimachinery/pkg/api/meta"
1113
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1214
"k8s.io/apimachinery/pkg/labels"
1315
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
16+
"k8s.io/apimachinery/pkg/util/sets"
1417
"k8s.io/apimachinery/pkg/util/wait"
1518
"k8s.io/client-go/dynamic"
1619
"k8s.io/client-go/kubernetes"
@@ -23,6 +26,7 @@ import (
2326
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
2427
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
2528
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
29+
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
2630
policylister "github.com/karmada-io/karmada/pkg/generated/listers/policy/v1alpha1"
2731
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha1"
2832
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
@@ -40,6 +44,26 @@ const (
4044
maxRetries = 15
4145
)
4246

47+
// ScheduleType defines the schedule type of a binding object should be performed.
48+
type ScheduleType string
49+
50+
const (
51+
// FirstSchedule means the binding object hasn't been scheduled.
52+
FirstSchedule ScheduleType = "FirstSchedule"
53+
54+
// ReconcileSchedule means the binding object associated policy has been changed.
55+
ReconcileSchedule ScheduleType = "ReconcileSchedule"
56+
57+
// FailoverSchedule means one of the cluster a binding object associated with becomes failure.
58+
FailoverSchedule ScheduleType = "FailoverSchedule"
59+
60+
// AvoidSchedule means don't need to trigger scheduler.
61+
AvoidSchedule ScheduleType = "AvoidSchedule"
62+
63+
// Unknown means can't detect the schedule type
64+
Unknown ScheduleType = "Unknown"
65+
)
66+
4367
// Failover indicates if the scheduler should performs re-scheduler in case of cluster failure.
4468
// TODO(RainbowMango): Remove the temporary solution by introducing feature flag
4569
var Failover bool
@@ -57,6 +81,7 @@ type Scheduler struct {
5781
clusterBindingLister worklister.ClusterResourceBindingLister
5882
clusterPolicyInformer cache.SharedIndexInformer
5983
clusterPolicyLister policylister.ClusterPropagationPolicyLister
84+
clusterLister clusterlister.ClusterLister
6085
informerFactory informerfactory.SharedInformerFactory
6186

6287
// TODO: implement a priority scheduling queue
@@ -77,6 +102,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
77102
clusterBindingLister := factory.Work().V1alpha1().ClusterResourceBindings().Lister()
78103
clusterPolicyInformer := factory.Policy().V1alpha1().ClusterPropagationPolicies().Informer()
79104
clusterPolicyLister := factory.Policy().V1alpha1().ClusterPropagationPolicies().Lister()
105+
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
80106
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
81107
schedulerCache := schedulercache.NewCache()
82108
// TODO: make plugins as a flag
@@ -93,6 +119,7 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
93119
clusterBindingLister: clusterBindingLister,
94120
clusterPolicyInformer: clusterPolicyInformer,
95121
clusterPolicyLister: clusterPolicyLister,
122+
clusterLister: clusterLister,
96123
informerFactory: factory,
97124
queue: queue,
98125
Algorithm: algorithm,
@@ -222,16 +249,90 @@ func (s *Scheduler) worker() {
222249
}
223250
}
224251

252+
func (s *Scheduler) getScheduleType(key string) ScheduleType {
253+
ns, name, err := cache.SplitMetaNamespaceKey(key)
254+
if err != nil {
255+
return Unknown
256+
}
257+
258+
// ResourceBinding object
259+
if len(ns) > 0 {
260+
resourceBinding, err := s.bindingLister.ResourceBindings(ns).Get(name)
261+
if errors.IsNotFound(err) {
262+
return Unknown
263+
}
264+
265+
if len(resourceBinding.Spec.Clusters) == 0 {
266+
return FirstSchedule
267+
}
268+
269+
policyNamespace := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNamespaceLabel)
270+
policyName := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNameLabel)
271+
272+
policy, err := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName)
273+
if err != nil {
274+
return Unknown
275+
}
276+
placement, err := json.Marshal(policy.Spec.Placement)
277+
if err != nil {
278+
klog.Errorf("Failed to marshal placement of propagationPolicy %s/%s, error: %v", policy.Namespace, policy.Name, err)
279+
return Unknown
280+
}
281+
policyPlacementStr := string(placement)
282+
283+
appliedPlacement := util.GetLabelValue(resourceBinding.Annotations, util.PolicyPlacementAnnotation)
284+
285+
if policyPlacementStr != appliedPlacement {
286+
return ReconcileSchedule
287+
}
288+
289+
clusters := s.schedulerCache.Snapshot().GetClusters()
290+
for _, tc := range resourceBinding.Spec.Clusters {
291+
bindedCluster := tc.Name
292+
for _, c := range clusters {
293+
if c.Cluster().Name == bindedCluster {
294+
if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
295+
return FailoverSchedule
296+
}
297+
}
298+
}
299+
}
300+
} else { // ClusterResourceBinding
301+
// TODO:
302+
return Unknown
303+
}
304+
305+
return AvoidSchedule
306+
}
307+
225308
func (s *Scheduler) scheduleNext() bool {
226309
key, shutdown := s.queue.Get()
227310
if shutdown {
228311
klog.Errorf("Fail to pop item from queue")
229312
return false
230313
}
231314
defer s.queue.Done(key)
232-
klog.Infof("Failover flag is: %v", Failover)
233315

234-
err := s.scheduleOne(key.(string))
316+
var err error
317+
switch s.getScheduleType(key.(string)) {
318+
case FirstSchedule:
319+
err = s.scheduleOne(key.(string))
320+
klog.Infof("Start scheduling binding(%s)", key.(string))
321+
case ReconcileSchedule: // share same logic with first schedule
322+
err = s.scheduleOne(key.(string))
323+
klog.Infof("Reschedule binding(%s) as placement changed", key.(string))
324+
case FailoverSchedule:
325+
if Failover {
326+
err = s.rescheduleOne(key.(string))
327+
klog.Infof("Reschedule binding(%s) as cluster failure", key.(string))
328+
}
329+
case AvoidSchedule:
330+
klog.Infof("Don't need to schedule binding(%s)", key.(string))
331+
default:
332+
err = fmt.Errorf("unknow schedule type")
333+
klog.Warningf("Failed to identify scheduler type for binding(%s)", key.(string))
334+
}
335+
235336
s.handleErr(err, key)
236337
return true
237338
}
@@ -351,7 +452,7 @@ func (s *Scheduler) handleErr(err error, key interface{}) {
351452
}
352453

353454
utilruntime.HandleError(err)
354-
klog.V(2).Infof("Dropping propagationbinding %q out of the queue: %v", key, err)
455+
klog.V(2).Infof("Dropping ResourceBinding %q out of the queue: %v", key, err)
355456
s.queue.Forget(key)
356457
}
357458

@@ -374,6 +475,16 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) {
374475
}
375476
klog.V(3).Infof("update event for cluster %s", newCluster.Name)
376477
s.schedulerCache.UpdateCluster(newCluster)
478+
479+
// Check if cluster becomes failure
480+
if meta.IsStatusConditionPresentAndEqual(newCluster.Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
481+
klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, Failover)
482+
483+
if Failover { // Trigger reschedule on cluster failure only when flag is true.
484+
s.expiredBindingInQueue(newCluster.Name)
485+
return
486+
}
487+
}
377488
}
378489

379490
func (s *Scheduler) deleteCluster(obj interface{}) {
@@ -395,3 +506,103 @@ func (s *Scheduler) deleteCluster(obj interface{}) {
395506
klog.V(3).Infof("delete event for cluster %s", cluster.Name)
396507
s.schedulerCache.DeleteCluster(cluster)
397508
}
509+
510+
// expiredBindingInQueue will find all ResourceBindings which are related to the current NotReady cluster and add them in queue.
511+
func (s *Scheduler) expiredBindingInQueue(notReadyClusterName string) {
512+
bindings, _ := s.bindingLister.List(labels.Everything())
513+
klog.Infof("Start traveling all ResourceBindings")
514+
for _, binding := range bindings {
515+
clusters := binding.Spec.Clusters
516+
for _, bindingCluster := range clusters {
517+
if bindingCluster.Name == notReadyClusterName {
518+
rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding)
519+
if err != nil {
520+
klog.Errorf("couldn't get rescheduleKey for ResourceBinding %#v: %v", bindingCluster.Name, err)
521+
return
522+
}
523+
s.queue.Add(rescheduleKey)
524+
klog.Infof("Add expired ResourceBinding in queue successfully")
525+
}
526+
}
527+
}
528+
}
529+
530+
func (s Scheduler) failoverCandidateCluster(binding *workv1alpha1.ResourceBinding) (reserved sets.String, candidates sets.String) {
531+
bindedCluster := sets.NewString()
532+
for _, cluster := range binding.Spec.Clusters {
533+
bindedCluster.Insert(cluster.Name)
534+
}
535+
536+
availableCluster := sets.NewString()
537+
for _, cluster := range s.schedulerCache.Snapshot().GetReadyClusters() {
538+
availableCluster.Insert(cluster.Cluster().Name)
539+
}
540+
541+
return bindedCluster.Difference(bindedCluster.Difference(availableCluster)), availableCluster.Difference(bindedCluster)
542+
}
543+
544+
// rescheduleOne.
545+
func (s *Scheduler) rescheduleOne(key string) (err error) {
546+
ns, name, err := cache.SplitMetaNamespaceKey(key)
547+
if err != nil {
548+
return err
549+
}
550+
551+
klog.Infof("begin rescheduling ResourceBinding %s %s", ns, name)
552+
defer klog.Infof("end rescheduling ResourceBinding %s: %s", ns, name)
553+
554+
resourceBinding, err := s.bindingLister.ResourceBindings(ns).Get(name)
555+
if errors.IsNotFound(err) {
556+
return nil
557+
}
558+
559+
binding := resourceBinding.DeepCopy()
560+
reservedClusters, candidateClusters := s.failoverCandidateCluster(resourceBinding)
561+
klog.Infof("Reserved clusters : %v", reservedClusters.List())
562+
klog.Infof("Candidate clusters: %v", candidateClusters.List())
563+
deltaLen := len(binding.Spec.Clusters) - len(reservedClusters)
564+
565+
klog.Infof("binding(%s/%s) has %d failure clusters, and got %d candidates", ns, name, deltaLen, len(candidateClusters))
566+
567+
// TODO: should schedule as much as possible?
568+
if len(candidateClusters) < deltaLen {
569+
klog.Warningf("ignore reschedule binding(%s/%s) as insufficient available cluster", ns, name)
570+
return nil
571+
}
572+
573+
targetClusters := reservedClusters
574+
575+
for i := 0; i < deltaLen; i++ {
576+
for clusterName := range candidateClusters {
577+
curCluster, _ := s.clusterLister.Get(clusterName)
578+
policyNamespace := util.GetLabelValue(binding.Labels, util.PropagationPolicyNamespaceLabel)
579+
policyName := util.GetLabelValue(binding.Labels, util.PropagationPolicyNameLabel)
580+
policy, _ := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName)
581+
582+
if !util.ClusterMatches(curCluster, *policy.Spec.Placement.ClusterAffinity) {
583+
continue
584+
}
585+
586+
klog.Infof("Rescheduling %s/ %s to member cluster %s", binding.Namespace, binding.Name, clusterName)
587+
targetClusters.Insert(clusterName)
588+
candidateClusters.Delete(clusterName)
589+
590+
// break as soon as find a result
591+
break
592+
}
593+
}
594+
595+
// TODO(tinyma123) Check if the final result meets the spread constraints.
596+
597+
binding.Spec.Clusters = nil
598+
for cluster := range targetClusters {
599+
binding.Spec.Clusters = append(binding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster})
600+
}
601+
klog.Infof("The final binding.Spec.Cluster values are: %v\n", binding.Spec.Clusters)
602+
603+
_, err = s.KarmadaClient.WorkV1alpha1().ResourceBindings(ns).Update(context.TODO(), binding, metav1.UpdateOptions{})
604+
if err != nil {
605+
return err
606+
}
607+
return nil
608+
}

0 commit comments

Comments
 (0)