Skip to content
This repository has been archived by the owner on Oct 6, 2023. It is now read-only.

Commit

Permalink
Clean up the code. (#60)
Browse files Browse the repository at this point in the history
Signed-off-by: yanggang <[email protected]>
  • Loading branch information
Yang Gang authored May 10, 2023
1 parent c517f0c commit 397216b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cmd/kube_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func kubeSchedulerCmd() *cobra.Command {
command := app.NewSchedulerCommand(
app.WithPlugin(kubethrottler.PluginName, kubethrottler.NewPlugin),
)
command.Short = "run kube-scheduler with kube-throttler plugin (need to enable 'KubeThrottler' plugin in config)"
command.Short = "Run kube-scheduler with kube-throttler plugin (need to enable 'KubeThrottler' plugin in config)"
// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
Expand Down
26 changes: 12 additions & 14 deletions pkg/controllers/clusterthrottle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (
"strings"
"time"

"github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1"
schedulev1alpha1 "github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1"
scheduleclientset "github.com/everpeace/kube-throttler/pkg/generated/clientset/versioned"
scheduleinformer "github.com/everpeace/kube-throttler/pkg/generated/informers/externalversions/schedule/v1alpha1"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -223,7 +221,7 @@ func (c *ClusterThrottleController) shouldCountIn(pod *corev1.Pod) bool {
return pod.Spec.SchedulerName == c.targetSchedulerName && isScheduled(pod)
}

func (c *ClusterThrottleController) affectedPods(thr *schedulev1alpha1.ClusterThrottle) ([]*v1.Pod, []*v1.Pod, error) {
func (c *ClusterThrottleController) affectedPods(thr *schedulev1alpha1.ClusterThrottle) ([]*corev1.Pod, []*corev1.Pod, error) {
pods := []*corev1.Pod{}
nsMap := map[string]*corev1.Namespace{}
nss, err := c.namespaceInformer.Lister().List(labels.Everything())
Expand All @@ -248,8 +246,8 @@ func (c *ClusterThrottleController) affectedPods(thr *schedulev1alpha1.ClusterTh
pods = append(pods, podsInNs...)
}

nonterminatedPods := []*v1.Pod{}
terminatedPods := []*v1.Pod{}
nonterminatedPods := []*corev1.Pod{}
terminatedPods := []*corev1.Pod{}
for _, pod := range pods {
if !(c.shouldCountIn(pod)) {
continue
Expand All @@ -271,7 +269,7 @@ func (c *ClusterThrottleController) affectedPods(thr *schedulev1alpha1.ClusterTh
return nonterminatedPods, terminatedPods, nil
}

func (c *ClusterThrottleController) affectedClusterThrottles(pod *v1.Pod) ([]*schedulev1alpha1.ClusterThrottle, error) {
func (c *ClusterThrottleController) affectedClusterThrottles(pod *corev1.Pod) ([]*schedulev1alpha1.ClusterThrottle, error) {
ns, err := c.namespaceInformer.Lister().Get(pod.Namespace)
if err != nil {
return nil, err
Expand Down Expand Up @@ -299,7 +297,7 @@ func (c *ClusterThrottleController) affectedClusterThrottles(pod *v1.Pod) ([]*sc
return affectedClusterThrottles, nil
}

func (c *ClusterThrottleController) Reserve(pod *v1.Pod) error {
func (c *ClusterThrottleController) Reserve(pod *corev1.Pod) error {
throttles, err := c.affectedClusterThrottles(pod)
if err != nil {
return err
Expand All @@ -322,7 +320,7 @@ func (c *ClusterThrottleController) Reserve(pod *v1.Pod) error {
return nil
}

func (c *ClusterThrottleController) ReserveOnClusterThrottle(pod *v1.Pod, thr *schedulev1alpha1.ClusterThrottle) bool {
func (c *ClusterThrottleController) ReserveOnClusterThrottle(pod *corev1.Pod, thr *schedulev1alpha1.ClusterThrottle) bool {
nn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name}
added := c.cache.addPod(nn, pod)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(nn)
Expand All @@ -338,7 +336,7 @@ func (c *ClusterThrottleController) ReserveOnClusterThrottle(pod *v1.Pod, thr *s
return added
}

func (c *ClusterThrottleController) UnReserve(pod *v1.Pod) error {
func (c *ClusterThrottleController) UnReserve(pod *corev1.Pod) error {
throttles, err := c.affectedClusterThrottles(pod)
if err != nil {
return err
Expand All @@ -361,7 +359,7 @@ func (c *ClusterThrottleController) UnReserve(pod *v1.Pod) error {
return nil
}

func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *v1.Pod, thr *schedulev1alpha1.ClusterThrottle) bool {
func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *corev1.Pod, thr *schedulev1alpha1.ClusterThrottle) bool {
nn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name}
removed := c.cache.removePod(nn, pod)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(nn)
Expand All @@ -378,7 +376,7 @@ func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *v1.Pod, thr
}

func (c *ClusterThrottleController) CheckThrottled(
pod *v1.Pod,
pod *corev1.Pod,
isThrottledOnEqual bool,
) (
[]schedulev1alpha1.ClusterThrottle,
Expand Down Expand Up @@ -434,7 +432,7 @@ func (c *ClusterThrottleController) mustSetupEventHandler() {
}
_, err = c.clusterthrottleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
thr := obj.(*v1alpha1.ClusterThrottle)
thr := obj.(*schedulev1alpha1.ClusterThrottle)
if !c.isResponsibleFor(thr) {
return
}
Expand All @@ -443,15 +441,15 @@ func (c *ClusterThrottleController) mustSetupEventHandler() {
c.enqueue(thr)
},
UpdateFunc: func(oldObj, newObj interface{}) {
thr := newObj.(*v1alpha1.ClusterThrottle)
thr := newObj.(*schedulev1alpha1.ClusterThrottle)
if !c.isResponsibleFor(thr) {
return
}
klog.V(4).InfoS("Update event", "ClusterThrottle", thr.Name)
c.enqueue(thr)
},
DeleteFunc: func(obj interface{}) {
thr := obj.(*v1alpha1.ClusterThrottle)
thr := obj.(*schedulev1alpha1.ClusterThrottle)
if !c.isResponsibleFor(thr) {
return
}
Expand Down
26 changes: 12 additions & 14 deletions pkg/controllers/throttle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (
"strings"
"time"

"github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1"
schedulev1alpha1 "github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1"
scheduleclientset "github.com/everpeace/kube-throttler/pkg/generated/clientset/versioned"
scheduleinformer "github.com/everpeace/kube-throttler/pkg/generated/informers/externalversions/schedule/v1alpha1"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -220,14 +218,14 @@ func (c *ThrottleController) shouldCountIn(pod *corev1.Pod) bool {
return pod.Spec.SchedulerName == c.targetSchedulerName && isScheduled(pod)
}

func (c *ThrottleController) affectedPods(thr *schedulev1alpha1.Throttle) ([]*v1.Pod, []*v1.Pod, error) {
func (c *ThrottleController) affectedPods(thr *schedulev1alpha1.Throttle) ([]*corev1.Pod, []*corev1.Pod, error) {
pods, err := c.podInformer.Lister().Pods(thr.Namespace).List(labels.Everything())
if err != nil {
return nil, nil, err
}

nonterminatedPods := []*v1.Pod{}
terminatedPods := []*v1.Pod{}
nonterminatedPods := []*corev1.Pod{}
terminatedPods := []*corev1.Pod{}
for _, pod := range pods {
if !(c.shouldCountIn(pod)) {
continue
Expand All @@ -247,7 +245,7 @@ func (c *ThrottleController) affectedPods(thr *schedulev1alpha1.Throttle) ([]*v1
return nonterminatedPods, terminatedPods, nil
}

func (c *ThrottleController) affectedThrottles(pod *v1.Pod) ([]*schedulev1alpha1.Throttle, error) {
func (c *ThrottleController) affectedThrottles(pod *corev1.Pod) ([]*schedulev1alpha1.Throttle, error) {
throttles, err := c.throttleInformer.Lister().Throttles(pod.Namespace).List(labels.Everything())
if err != nil {
return nil, err
Expand All @@ -270,7 +268,7 @@ func (c *ThrottleController) affectedThrottles(pod *v1.Pod) ([]*schedulev1alpha1
return affectedThrottles, nil
}

func (c *ThrottleController) Reserve(pod *v1.Pod) error {
func (c *ThrottleController) Reserve(pod *corev1.Pod) error {
throttles, err := c.affectedThrottles(pod)
if err != nil {
return err
Expand All @@ -293,7 +291,7 @@ func (c *ThrottleController) Reserve(pod *v1.Pod) error {
return nil
}

func (c *ThrottleController) ReserveOnThrottle(pod *v1.Pod, thr *schedulev1alpha1.Throttle) bool {
func (c *ThrottleController) ReserveOnThrottle(pod *corev1.Pod, thr *schedulev1alpha1.Throttle) bool {
nn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name}
added := c.cache.addPod(nn, pod)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(nn)
Expand All @@ -309,7 +307,7 @@ func (c *ThrottleController) ReserveOnThrottle(pod *v1.Pod, thr *schedulev1alpha
return added
}

func (c *ThrottleController) UnReserve(pod *v1.Pod) error {
func (c *ThrottleController) UnReserve(pod *corev1.Pod) error {
throttles, err := c.affectedThrottles(pod)
if err != nil {
return err
Expand All @@ -332,7 +330,7 @@ func (c *ThrottleController) UnReserve(pod *v1.Pod) error {
return nil
}

func (c *ThrottleController) UnReserveOnThrottle(pod *v1.Pod, thr *schedulev1alpha1.Throttle) bool {
func (c *ThrottleController) UnReserveOnThrottle(pod *corev1.Pod, thr *schedulev1alpha1.Throttle) bool {
nn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name}
removed := c.cache.removePod(nn, pod)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(nn)
Expand All @@ -349,7 +347,7 @@ func (c *ThrottleController) UnReserveOnThrottle(pod *v1.Pod, thr *schedulev1alp
}

func (c *ThrottleController) CheckThrottled(
pod *v1.Pod,
pod *corev1.Pod,
isThrottledOnEqual bool,
) (
[]schedulev1alpha1.Throttle,
Expand Down Expand Up @@ -402,23 +400,23 @@ func (c *ThrottleController) CheckThrottled(
func (c *ThrottleController) mustSetupEventHandler() {
_, err := c.throttleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
thr := obj.(*v1alpha1.Throttle)
thr := obj.(*schedulev1alpha1.Throttle)
if !c.isResponsibleFor(thr) {
return
}
klog.V(4).InfoS("Add event", "Throttle", thr.Namespace+"/"+thr.Name)
c.enqueue(thr)
},
UpdateFunc: func(oldObj, newObj interface{}) {
thr := newObj.(*v1alpha1.Throttle)
thr := newObj.(*schedulev1alpha1.Throttle)
if !c.isResponsibleFor(thr) {
return
}
klog.V(4).InfoS("Update event", "Throttle", thr.Namespace+"/"+thr.Name)
c.enqueue(thr)
},
DeleteFunc: func(obj interface{}) {
thr := obj.(*v1alpha1.Throttle)
thr := obj.(*schedulev1alpha1.Throttle)
if !c.isResponsibleFor(thr) {
return
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/scheduler_plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func NewPlugin(configuration runtime.Object, fh framework.Handle) (framework.Plu
)

scheduleInformerFactory.Start(ctx.Done())
scheduleInformerFactory.WaitForCacheSync(ctx.Done())
syncResults := scheduleInformerFactory.WaitForCacheSync(ctx.Done())
for informer, ok := range syncResults {
if !ok {
Expand Down Expand Up @@ -249,7 +248,6 @@ func (pl *KubeThrottler) Unreserve(
if err != nil {
utilruntime.HandleError(errors.Wrapf(err, "Failed to unreserve pod %s/%s in ThrottleController", pod.Namespace, pod.Name))
}
pod.GetName()
err = pl.clusterThrottleCtr.UnReserve(pod)
if err != nil {
utilruntime.HandleError(errors.Wrapf(err, "Failed to unreserve pod %s/%s in ClusterThrottleController", pod.Namespace, pod.Name))
Expand Down

0 comments on commit 397216b

Please sign in to comment.