Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClusterThrottles support PodGroup aka Gang #12

Merged
merged 10 commits into from
Sep 6, 2024
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ jobs:
- name: Validate GoReleaser
uses: goreleaser/goreleaser-action@v2
with:
args: release --snapshot --skip-publish --rm-dist --debug
args: release --snapshot --skip=publish --clean
3 changes: 2 additions & 1 deletion .goreleaser.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
version: 2
before:
hooks:
- go mod download
Expand Down Expand Up @@ -62,4 +63,4 @@ docker_manifests:

# change log will be generated by tagpr
changelog:
skip: true
disable: true
11 changes: 8 additions & 3 deletions pkg/apis/schedule/v1alpha1/clusterthrottle_types.go
utam0k marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ClusterThrottleSpec struct {
Selector ClusterThrottleSelector `json:"selector,omitempty"`
}

func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus {
func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, usedByGroup ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus {
threshold := thr.Spec.Threshold
if !thr.Status.CalculatedThreshold.CalculatedAt.Time.IsZero() {
threshold = thr.Status.CalculatedThreshold.Threshold
Expand All @@ -46,11 +46,16 @@ func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAm
return CheckThrottleStatusActive
}

used := ResourceAmount{}.Add(thr.Status.Used).Add(ResourceAmountOfPod(pod)).Add(reservedResourceAmount)
if threshold.IsThrottled(used, isThrottledOnEqual).IsThrottledFor(pod) {
usedWithPod := alreadyUsed.Add(ResourceAmountOfPod(pod))
if threshold.IsThrottled(usedWithPod, isThrottledOnEqual).IsThrottledFor(pod) {
return CheckThrottleStatusInsufficient
}

usedWithGroup := usedWithPod.Add(usedByGroup)
if threshold.IsThrottled(usedWithGroup, isThrottledOnEqual).IsThrottledFor(pod) {
return CheckThrottleStatusInsufficientIncludingPodGroup
}

return CheckThrottleStatusNotThrottled
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/apis/schedule/v1alpha1/throttle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ type ThrottleStatus struct {
type CheckThrottleStatus string

var (
CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled"
CheckThrottleStatusActive CheckThrottleStatus = "active"
CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient"
CheckThrottleStatusPodRequestsExceedsThreshold CheckThrottleStatus = "pod-requests-exceeds-threshold"
CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled"
CheckThrottleStatusActive CheckThrottleStatus = "active"
CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient"
CheckThrottleStatusInsufficientIncludingPodGroup CheckThrottleStatus = "insufficient-including-pod-group"
CheckThrottleStatusPodRequestsExceedsThreshold CheckThrottleStatus = "pod-requests-exceeds-threshold"
)

func (thr Throttle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus {
Expand Down
66 changes: 64 additions & 2 deletions pkg/controllers/clusterthrottle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,27 +378,86 @@ func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *corev1.Pod,
func (c *ClusterThrottleController) CheckThrottled(
pod *corev1.Pod,
isThrottledOnEqual bool,
groupNameAnnotation string,
utam0k marked this conversation as resolved.
Show resolved Hide resolved
) (
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
error,
utam0k marked this conversation as resolved.
Show resolved Hide resolved
) {
throttles, err := c.affectedClusterThrottles(pod)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
affected := []schedulev1alpha1.ClusterThrottle{}
alreadyThrottled := []schedulev1alpha1.ClusterThrottle{}
insufficient := []schedulev1alpha1.ClusterThrottle{}
insufficientIncludingGroup := []schedulev1alpha1.ClusterThrottle{}
podRequestsExceedsThreshold := []schedulev1alpha1.ClusterThrottle{}

// Fetch the pods which have group name's annotation
var podGroup []*corev1.Pod
if groupNameAnnotation != "" {
groupName, isGroup := pod.Annotations[groupNameAnnotation]
if isGroup {
candidatePods, err := c.podInformer.Lister().Pods(pod.Namespace).List(labels.Everything())
if err != nil {
return nil, nil, nil, nil, nil, err
}

for _, candidatePod := range candidatePods {
if isScheduled(candidatePod) {
continue
}

if gn, ok := candidatePod.Annotations[groupNameAnnotation]; !ok || gn != groupName {
continue
}

// Don't count the scheduling pod itself.
if candidatePod.UID == pod.UID {
continue
}

podGroup = append(podGroup, candidatePod)
}
}
}

for _, thr := range throttles {
affected = append(affected, *thr)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name})

usedInGroup := schedulev1alpha1.ResourceAmount{}
utam0k marked this conversation as resolved.
Show resolved Hide resolved
for _, groupPod := range podGroup {
ns, err := c.namespaceInformer.Lister().Get(groupPod.Namespace)
if err != nil {
return nil, nil, nil, nil, nil, err
}
Comment on lines +430 to +433

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All pods in a group have the same namespace, so these lines can be moved out of the for-loop

Copy link
Member Author

@utam0k utam0k Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I don't assume the premise that all pods in a group are in the same namespace. This is because this assumption doesn't allow us to improve its implementation and performance that much better than the current implementation in this PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because this assumption doesn't allow us to improve its implementation and performance that much better than the current implementation in this PR.

Understood.
But, the current implementation assumes all pods in a group are in the same namespace, IIUC.

candidatePods, err := c.podInformer.Lister().Pods(pod.Namespace).List(labels.Everything())

We can modify the implementation to fetch the namespace for each pod in a group in the future when we remove this assumption?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I had missed it. Doing so would have meant exploring all the namespace pods, which would have had a huge impact on performance. I still think you are right, let's assume they are in the same namespace.


// If a pod of a group is already counted, skip it because it'll be counted as a reserved resrouce amount.
utam0k marked this conversation as resolved.
Show resolved Hide resolved
thrnn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name}
if c.cache.exist(thrnn, groupPod) {
continue
}

match, err := thr.Spec.Selector.MatchesToPod(groupPod, ns)
if err != nil {
return nil, nil, nil, nil, nil, err
}
if !match {
continue
}

usedInGroup = usedInGroup.Add(schedulev1alpha1.ResourceAmountOfPod(groupPod))
}

checkStatus := thr.CheckThrottledFor(
pod,
reservedAmt,
usedInGroup,
isThrottledOnEqual,
)
klog.V(3).InfoS("CheckThrottled result",
Expand All @@ -408,6 +467,7 @@ func (c *ClusterThrottleController) CheckThrottled(
"Threashold", thr.Status.CalculatedThreshold.Threshold,
"RequestedByPod", schedulev1alpha1.ResourceAmountOfPod(pod),
"UsedInClusterThrottle", thr.Status.Used,
"UsedInPodGroup", usedInGroup,
"ReservedAmountInScheduler", reservedAmt,
"ReservedPodsInScheduler", strings.Join(sets.List(reservedPodNNs), ","),
"AmountForCheck", schedulev1alpha1.ResourceAmount{}.Add(thr.Status.Used).Add(schedulev1alpha1.ResourceAmountOfPod(pod)).Add(reservedAmt),
Expand All @@ -417,11 +477,13 @@ func (c *ClusterThrottleController) CheckThrottled(
alreadyThrottled = append(alreadyThrottled, *thr)
case schedulev1alpha1.CheckThrottleStatusInsufficient:
insufficient = append(insufficient, *thr)
case schedulev1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup:
insufficientIncludingGroup = append(insufficientIncludingGroup, *thr)
case schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold:
podRequestsExceedsThreshold = append(podRequestsExceedsThreshold, *thr)
}
}
return alreadyThrottled, insufficient, podRequestsExceedsThreshold, affected, nil
return alreadyThrottled, insufficient, insufficientIncludingGroup, podRequestsExceedsThreshold, affected, nil
}

// mustSetupEventHandler sets up event handlers. If something wrong happens, it will panic.
Expand Down
15 changes: 15 additions & 0 deletions pkg/controllers/reserved_resource_amounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func (c *reservedResourceAmounts) getPodResourceAmountMap(nn types.NamespacedNam
return c.cache[nn]
}

func (c *reservedResourceAmounts) exist(nn types.NamespacedName, pod *corev1.Pod) bool {
c.keyMutex.LockKey(nn.String())
defer func() {
_ = c.keyMutex.UnlockKey(nn.String())
}()

m := c.getPodResourceAmountMap(nn)
return m.exist(types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
}

func (c *reservedResourceAmounts) addPod(nn types.NamespacedName, pod *corev1.Pod) bool {
c.keyMutex.LockKey(nn.String())
defer func() {
Expand Down Expand Up @@ -145,6 +155,11 @@ func (c podResourceAmountMap) removeByNN(nn types.NamespacedName) bool {
return ok
}

func (c podResourceAmountMap) exist(nn types.NamespacedName) bool {
_, ok := c[nn]
return ok
}

func (c podResourceAmountMap) totalResoruceAmount() (schedulev1alpha1.ResourceAmount, sets.Set[string]) {
result := schedulev1alpha1.ResourceAmount{}
nns := sets.New[string]()
Expand Down
22 changes: 14 additions & 8 deletions pkg/scheduler_plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ const (
)

type KubeThrottler struct {
fh framework.Handle
throttleCtr *controllers.ThrottleController
clusterThrottleCtr *controllers.ClusterThrottleController
fh framework.Handle
throttleCtr *controllers.ThrottleController
clusterThrottleCtr *controllers.ClusterThrottleController
groupNameAnnotation string
}

var _ framework.PreFilterPlugin = &KubeThrottler{}
Expand Down Expand Up @@ -135,9 +136,10 @@ func NewPlugin(ctx context.Context, configuration runtime.Object, fh framework.H
}

pl := KubeThrottler{
fh: fh,
throttleCtr: throttleController,
clusterThrottleCtr: clusterthrottleController,
fh: fh,
throttleCtr: throttleController,
clusterThrottleCtr: clusterthrottleController,
groupNameAnnotation: kubeThrottlerArgs.GroupNameAnnotation,
}

return &pl, nil
Expand All @@ -160,20 +162,21 @@ func (pl *KubeThrottler) PreFilter(
"#AffectedThrottles", len(thrAffected),
)

clthrActive, clthrInsufficient, clthrPodRequestsExceeds, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false)
clthrActive, clthrInsufficient, clthrInsufficientGroup, clthrPodRequestsExceeds, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false, pl.groupNameAnnotation)
if err != nil {
return nil, framework.NewStatus(framework.Error, err.Error())
}
klog.V(2).InfoS("PreFilter: clusterthrottle check result",
"Pod", pod.Namespace+"/"+pod.Name,
"#ActiveClusterThrottles", len(clthrActive),
"#InsufficientClusterThrottles", len(clthrInsufficient),
"#InsufficientClusterThrottlesIncludingGroup", len(clthrInsufficientGroup),
"#PodRequestsExceedsThresholdClusterThrottles", len(clthrPodRequestsExceeds),
"#AffectedClusterThrottles", len(clThrAffected),
)

if len(thrActive)+len(thrInsufficient)+len(thrPodRequestsExceeds)+
len(clthrActive)+len(clthrInsufficient)+len(clthrPodRequestsExceeds) == 0 {
len(clthrActive)+len(clthrInsufficient)+len(clthrInsufficientGroup)+len(clthrPodRequestsExceeds) == 0 {
return nil, framework.NewStatus(framework.Success)
}

Expand Down Expand Up @@ -206,6 +209,9 @@ func (pl *KubeThrottler) PreFilter(
if len(clthrInsufficient) != 0 {
reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusInsufficient, strings.Join(clusterThrottleNames(clthrInsufficient), ",")))
}
if len(clthrInsufficientGroup) != 0 {
reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup, strings.Join(clusterThrottleNames(clthrInsufficientGroup), ",")))
}
if len(thrInsufficient) != 0 {
reasons = append(reasons, fmt.Sprintf("throttle[%s]=%s", schedulev1alpha1.CheckThrottleStatusInsufficient, strings.Join(throttleNames(thrInsufficient), ",")))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler_plugin/plugin_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type KubeThrottlerPluginArgs struct {
TargetSchedulerName string `json:"targetSchedulerName"`
ControllerThrediness int `json:"controllerThrediness"`
NumKeyMutex int `json:"numKeyMutex"`
GroupNameAnnotation string `json:"groupNameAnnotation"`
}

func DecodePluginArgs(configuration runtime.Object) (*KubeThrottlerPluginArgs, error) {
Expand Down
44 changes: 43 additions & 1 deletion test/integration/clusterthrottle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -161,8 +162,49 @@ var _ = Describe("Clusterthrottle Test", func() {
Consistently(PodIsNotScheduled(ctx, DefaultNs, pod.Name)).Should(Succeed())
})
})
Context("Pod Group", func() {
utam0k marked this conversation as resolved.
Show resolved Hide resolved
var (
podPassedGroup []*corev1.Pod
podThrottledGroup []*corev1.Pod
)
BeforeEach(func() {
for i := 0; i < 2; i++ {
podPassedGroup = append(podPassedGroup, MustCreatePod(ctx, MakePod(DefaultNs, fmt.Sprintf("passed-pod%d", i), "50m").Annotation(groupNameAnnotation, "passed").Label(throttleKey, throttleName).Obj()))
}
for i := 0; i < 3; i++ {
podThrottledGroup = append(podThrottledGroup, MustCreatePod(ctx, MakePod(DefaultNs, fmt.Sprintf("throttled-pod%d", i), "50m").Annotation(groupNameAnnotation, "throttled").Label(throttleKey, throttleName).Obj()))
}
})
It("should not schedule pod3", func() {
Eventually(AsyncAll(
WakeupBackoffPod(ctx),
ClusterThottleHasStatus(
ctx, thr.Name,
ClthrOpts.WithCalculatedThreshold(thr.Spec.Threshold),
ClthrOpts.WithPodThrottled(true),
ClthrOpts.WithUsedPod(len(podPassedGroup)),
ClthrOpts.WithUsedCpuReq(fmt.Sprintf("%dm", len(podPassedGroup)*50)),
ClthrOpts.WithCpuThrottled(false),
),
func(g types.Gomega) {
for _, pod := range podPassedGroup {
PodIsScheduled(ctx, DefaultNs, pod.Name)
}
},
func(g types.Gomega) {
for _, pod := range podThrottledGroup {
MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup)
}
},
)).Should(Succeed())
Consistently(func(g types.Gomega) {
for _, pod := range podPassedGroup {
PodIsScheduled(ctx, DefaultNs, pod.Name)
ordovicia marked this conversation as resolved.
Show resolved Hide resolved
}
}).Should(Succeed())
})
})
})

When("Many pods are created at once", func() {
var thr *v1alpha1.ClusterThrottle
var scheduled = make([]*corev1.Pod, 20)
Expand Down
14 changes: 8 additions & 6 deletions test/integration/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ import (
)

const (
SchedulerName = "kube-throttler-integration"
ThrottlerName = "kube-throttler"
DefaultNs = "default"
SchedulerName = "kube-throttler-integration"
ThrottlerName = "kube-throttler"
DefaultNs = "default"
groupNameAnnotation = "scheduling.k8s.pfn.io/group-name"
)

var (
Expand Down Expand Up @@ -111,12 +112,13 @@ func mustStartKubeThrottler() {
name: %s
targetSchedulerName: %s
kubeconfig: %s
groupNameAnnotation: %s
controllerThrediness: 64
numKeyMutex: 128
`,
kubeConfigPath, // clientConnection.kubeconfig
SchedulerName, // prifiles[0].scedulerName
ThrottlerName, SchedulerName, kubeConfigPath, // profiles[0].pluginConfig[0].args
kubeConfigPath, // clientConnection.kubeconfig
SchedulerName, // prifiles[0].scedulerName
ThrottlerName, SchedulerName, kubeConfigPath, groupNameAnnotation, // profiles[0].pluginConfig[0].args
),
))
Expect(err).NotTo(HaveOccurred())
Expand Down
Loading