diff --git a/cmd/kube_scheduler.go b/cmd/kube_scheduler.go index 2128164..8a826cd 100644 --- a/cmd/kube_scheduler.go +++ b/cmd/kube_scheduler.go @@ -29,7 +29,7 @@ func kubeSchedulerCmd() *cobra.Command { command := app.NewSchedulerCommand( app.WithPlugin(kubethrottler.PluginName, kubethrottler.NewPlugin), ) - command.Short = "run kube-scheduler with kube-throttler plugin (need to enable 'KubeThrottler' plugin in config)" + command.Short = "Run kube-scheduler with kube-throttler plugin (need to enable 'KubeThrottler' plugin in config)" // TODO: once we switch everything over to Cobra commands, we can go back to calling // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the // normalize func and add the go flag set by hand. diff --git a/pkg/controllers/clusterthrottle_controller.go b/pkg/controllers/clusterthrottle_controller.go index fff50b0..3b8f1ce 100644 --- a/pkg/controllers/clusterthrottle_controller.go +++ b/pkg/controllers/clusterthrottle_controller.go @@ -23,13 +23,11 @@ import ( "strings" "time" - "github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1" schedulev1alpha1 "github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1" scheduleclientset "github.com/everpeace/kube-throttler/pkg/generated/clientset/versioned" scheduleinformer "github.com/everpeace/kube-throttler/pkg/generated/informers/externalversions/schedule/v1alpha1" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -223,7 +221,7 @@ func (c *ClusterThrottleController) shouldCountIn(pod *corev1.Pod) bool { return pod.Spec.SchedulerName == c.targetSchedulerName && isScheduled(pod) } -func (c *ClusterThrottleController) affectedPods(thr *schedulev1alpha1.ClusterThrottle) ([]*v1.Pod, []*v1.Pod, error) { +func (c *ClusterThrottleController) affectedPods(thr *schedulev1alpha1.ClusterThrottle) ([]*corev1.Pod, []*corev1.Pod, error) { pods := []*corev1.Pod{} nsMap := map[string]*corev1.Namespace{} nss, err := c.namespaceInformer.Lister().List(labels.Everything()) @@ -248,8 +246,8 @@ func (c *ClusterThrottleController) affectedPods(thr *schedulev1alpha1.ClusterTh pods = append(pods, podsInNs...) } - nonterminatedPods := []*v1.Pod{} - terminatedPods := []*v1.Pod{} + nonterminatedPods := []*corev1.Pod{} + terminatedPods := []*corev1.Pod{} for _, pod := range pods { if !(c.shouldCountIn(pod)) { continue @@ -271,7 +269,7 @@ func (c *ClusterThrottleController) affectedPods(thr *schedulev1alpha1.ClusterTh return nonterminatedPods, terminatedPods, nil } -func (c *ClusterThrottleController) affectedClusterThrottles(pod *v1.Pod) ([]*schedulev1alpha1.ClusterThrottle, error) { +func (c *ClusterThrottleController) affectedClusterThrottles(pod *corev1.Pod) ([]*schedulev1alpha1.ClusterThrottle, error) { ns, err := c.namespaceInformer.Lister().Get(pod.Namespace) if err != nil { return nil, err @@ -299,7 +297,7 @@ func (c *ClusterThrottleController) affectedClusterThrottles(pod *v1.Pod) ([]*sc return affectedClusterThrottles, nil } -func (c *ClusterThrottleController) Reserve(pod *v1.Pod) error { +func (c *ClusterThrottleController) Reserve(pod *corev1.Pod) error { throttles, err := c.affectedClusterThrottles(pod) if err != nil { return err @@ -322,7 +320,7 @@ func (c *ClusterThrottleController) Reserve(pod *v1.Pod) error { return nil } -func (c *ClusterThrottleController) ReserveOnClusterThrottle(pod *v1.Pod, thr *schedulev1alpha1.ClusterThrottle) bool { +func (c *ClusterThrottleController) ReserveOnClusterThrottle(pod *corev1.Pod, thr *schedulev1alpha1.ClusterThrottle) bool { nn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name} added := c.cache.addPod(nn, pod) reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(nn) @@ -338,7 +336,7 @@ func (c *ClusterThrottleController) ReserveOnClusterThrottle(pod *v1.Pod, thr *s return added } -func (c *ClusterThrottleController) UnReserve(pod *v1.Pod) error { +func (c *ClusterThrottleController) UnReserve(pod *corev1.Pod) error { throttles, err := c.affectedClusterThrottles(pod) if err != nil { return err @@ -361,7 +359,7 @@ func (c *ClusterThrottleController) UnReserve(pod *v1.Pod) error { return nil } -func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *v1.Pod, thr *schedulev1alpha1.ClusterThrottle) bool { +func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *corev1.Pod, thr *schedulev1alpha1.ClusterThrottle) bool { nn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name} removed := c.cache.removePod(nn, pod) reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(nn) @@ -378,7 +376,7 @@ func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *v1.Pod, thr } func (c *ClusterThrottleController) CheckThrottled( - pod *v1.Pod, + pod *corev1.Pod, isThrottledOnEqual bool, ) ( []schedulev1alpha1.ClusterThrottle, @@ -434,7 +432,7 @@ func (c *ClusterThrottleController) mustSetupEventHandler() { } _, err = c.clusterthrottleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - thr := obj.(*v1alpha1.ClusterThrottle) + thr := obj.(*schedulev1alpha1.ClusterThrottle) if !c.isResponsibleFor(thr) { return } @@ -443,7 +441,7 @@ func (c *ClusterThrottleController) mustSetupEventHandler() { c.enqueue(thr) }, UpdateFunc: func(oldObj, newObj interface{}) { - thr := newObj.(*v1alpha1.ClusterThrottle) + thr := newObj.(*schedulev1alpha1.ClusterThrottle) if !c.isResponsibleFor(thr) { return } @@ -451,7 +449,7 @@ func (c *ClusterThrottleController) mustSetupEventHandler() { c.enqueue(thr) }, DeleteFunc: func(obj interface{}) { - thr := obj.(*v1alpha1.ClusterThrottle) + thr := obj.(*schedulev1alpha1.ClusterThrottle) if !c.isResponsibleFor(thr) { return } diff --git a/pkg/controllers/throttle_controller.go b/pkg/controllers/throttle_controller.go index c109f84..b24d4ac 100644 --- a/pkg/controllers/throttle_controller.go +++ b/pkg/controllers/throttle_controller.go @@ -23,13 +23,11 @@ import ( "strings" "time" - "github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1" schedulev1alpha1 "github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1" scheduleclientset "github.com/everpeace/kube-throttler/pkg/generated/clientset/versioned" scheduleinformer "github.com/everpeace/kube-throttler/pkg/generated/informers/externalversions/schedule/v1alpha1" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -220,14 +218,14 @@ func (c *ThrottleController) shouldCountIn(pod *corev1.Pod) bool { return pod.Spec.SchedulerName == c.targetSchedulerName && isScheduled(pod) } -func (c *ThrottleController) affectedPods(thr *schedulev1alpha1.Throttle) ([]*v1.Pod, []*v1.Pod, error) { +func (c *ThrottleController) affectedPods(thr *schedulev1alpha1.Throttle) ([]*corev1.Pod, []*corev1.Pod, error) { pods, err := c.podInformer.Lister().Pods(thr.Namespace).List(labels.Everything()) if err != nil { return nil, nil, err } - nonterminatedPods := []*v1.Pod{} - terminatedPods := []*v1.Pod{} + nonterminatedPods := []*corev1.Pod{} + terminatedPods := []*corev1.Pod{} for _, pod := range pods { if !(c.shouldCountIn(pod)) { continue @@ -247,7 +245,7 @@ func (c *ThrottleController) affectedPods(thr *schedulev1alpha1.Throttle) ([]*v1 return nonterminatedPods, terminatedPods, nil } -func (c *ThrottleController) affectedThrottles(pod *v1.Pod) ([]*schedulev1alpha1.Throttle, error) { +func (c *ThrottleController) affectedThrottles(pod *corev1.Pod) ([]*schedulev1alpha1.Throttle, error) { throttles, err := c.throttleInformer.Lister().Throttles(pod.Namespace).List(labels.Everything()) if err != nil { return nil, err @@ -270,7 +268,7 @@ func (c *ThrottleController) affectedThrottles(pod *v1.Pod) ([]*schedulev1alpha1 return affectedThrottles, nil } -func (c *ThrottleController) Reserve(pod *v1.Pod) error { +func (c *ThrottleController) Reserve(pod *corev1.Pod) error { throttles, err := c.affectedThrottles(pod) if err != nil { return err @@ -293,7 +291,7 @@ func (c *ThrottleController) Reserve(pod *v1.Pod) error { return nil } -func (c *ThrottleController) ReserveOnThrottle(pod *v1.Pod, thr *schedulev1alpha1.Throttle) bool { +func (c *ThrottleController) ReserveOnThrottle(pod *corev1.Pod, thr *schedulev1alpha1.Throttle) bool { nn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name} added := c.cache.addPod(nn, pod) reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(nn) @@ -309,7 +307,7 @@ func (c *ThrottleController) ReserveOnThrottle(pod *v1.Pod, thr *schedulev1alpha return added } -func (c *ThrottleController) UnReserve(pod *v1.Pod) error { +func (c *ThrottleController) UnReserve(pod *corev1.Pod) error { throttles, err := c.affectedThrottles(pod) if err != nil { return err @@ -332,7 +330,7 @@ func (c *ThrottleController) UnReserve(pod *v1.Pod) error { return nil } -func (c *ThrottleController) UnReserveOnThrottle(pod *v1.Pod, thr *schedulev1alpha1.Throttle) bool { +func (c *ThrottleController) UnReserveOnThrottle(pod *corev1.Pod, thr *schedulev1alpha1.Throttle) bool { nn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name} removed := c.cache.removePod(nn, pod) reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(nn) @@ -349,7 +347,7 @@ func (c *ThrottleController) UnReserveOnThrottle(pod *v1.Pod, thr *schedulev1alp } func (c *ThrottleController) CheckThrottled( - pod *v1.Pod, + pod *corev1.Pod, isThrottledOnEqual bool, ) ( []schedulev1alpha1.Throttle, @@ -402,7 +400,7 @@ func (c *ThrottleController) CheckThrottled( func (c *ThrottleController) mustSetupEventHandler() { _, err := c.throttleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - thr := obj.(*v1alpha1.Throttle) + thr := obj.(*schedulev1alpha1.Throttle) if !c.isResponsibleFor(thr) { return } @@ -410,7 +408,7 @@ func (c *ThrottleController) mustSetupEventHandler() { c.enqueue(thr) }, UpdateFunc: func(oldObj, newObj interface{}) { - thr := newObj.(*v1alpha1.Throttle) + thr := newObj.(*schedulev1alpha1.Throttle) if !c.isResponsibleFor(thr) { return } @@ -418,7 +416,7 @@ func (c *ThrottleController) mustSetupEventHandler() { c.enqueue(thr) }, DeleteFunc: func(obj interface{}) { - thr := obj.(*v1alpha1.Throttle) + thr := obj.(*schedulev1alpha1.Throttle) if !c.isResponsibleFor(thr) { return } diff --git a/pkg/scheduler_plugin/plugin.go b/pkg/scheduler_plugin/plugin.go index 0f013f1..e58f798 100644 --- a/pkg/scheduler_plugin/plugin.go +++ b/pkg/scheduler_plugin/plugin.go @@ -112,7 +112,6 @@ func NewPlugin(configuration runtime.Object, fh framework.Handle) (framework.Plu ) scheduleInformerFactory.Start(ctx.Done()) - scheduleInformerFactory.WaitForCacheSync(ctx.Done()) syncResults := scheduleInformerFactory.WaitForCacheSync(ctx.Done()) for informer, ok := range syncResults { if !ok { @@ -249,7 +248,6 @@ func (pl *KubeThrottler) Unreserve( if err != nil { utilruntime.HandleError(errors.Wrapf(err, "Failed to unreserve pod %s/%s in ThrottleController", pod.Namespace, pod.Name)) } - pod.GetName() err = pl.clusterThrottleCtr.UnReserve(pod) if err != nil { utilruntime.HandleError(errors.Wrapf(err, "Failed to unreserve pod %s/%s in ClusterThrottleController", pod.Namespace, pod.Name))