Skip to content

Commit

Permalink
fixup! ClusterThrottles support PodGroup aka Gang
Browse files Browse the repository at this point in the history
Co-authored-by: Hidehito Yabuuchi <[email protected]>
Signed-off-by: utam0k <[email protected]>
  • Loading branch information
utam0k and ordovicia committed Jul 19, 2024
1 parent a5e2678 commit 2f48400
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 53 deletions.
27 changes: 11 additions & 16 deletions pkg/controllers/clusterthrottle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,22 +380,17 @@ func (c *ClusterThrottleController) CheckThrottled(
isThrottledOnEqual bool,
groupNameAnnotation string,
) (
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
error,
alreadyThrottled []schedulev1alpha1.ClusterThrottle,
insufficient []schedulev1alpha1.ClusterThrottle,
insufficientIncludingGroup []schedulev1alpha1.ClusterThrottle,
podRequestsExceedsThreshold []schedulev1alpha1.ClusterThrottle,
affected []schedulev1alpha1.ClusterThrottle,
_ error,
) {
throttles, err := c.affectedClusterThrottles(pod)
if err != nil {
return nil, nil, nil, nil, nil, err
}
affected := []schedulev1alpha1.ClusterThrottle{}
alreadyThrottled := []schedulev1alpha1.ClusterThrottle{}
insufficient := []schedulev1alpha1.ClusterThrottle{}
insufficientIncludingGroup := []schedulev1alpha1.ClusterThrottle{}
podRequestsExceedsThreshold := []schedulev1alpha1.ClusterThrottle{}

// Fetch the pods which have group name's annotation
var podGroup []*corev1.Pod
Expand Down Expand Up @@ -430,14 +425,14 @@ func (c *ClusterThrottleController) CheckThrottled(
affected = append(affected, *thr)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name})

usedInGroup := schedulev1alpha1.ResourceAmount{}
requestedByGroup := schedulev1alpha1.ResourceAmount{}
for _, groupPod := range podGroup {
ns, err := c.namespaceInformer.Lister().Get(groupPod.Namespace)
if err != nil {
return nil, nil, nil, nil, nil, err
}

// If a pod of a group is already counted, skip it because it'll be counted as a reserved resrouce amount.
// If a pod of a group is already counted, skip it because it'll be counted as a reserved resource amount.
thrnn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name}
if c.cache.exist(thrnn, groupPod) {
continue
Expand All @@ -451,13 +446,13 @@ func (c *ClusterThrottleController) CheckThrottled(
continue
}

usedInGroup = usedInGroup.Add(schedulev1alpha1.ResourceAmountOfPod(groupPod))
requestedByGroup = requestedByGroup.Add(schedulev1alpha1.ResourceAmountOfPod(groupPod))
}

checkStatus := thr.CheckThrottledFor(
pod,
reservedAmt,
usedInGroup,
requestedByGroup,
isThrottledOnEqual,
)
klog.V(3).InfoS("CheckThrottled result",
Expand All @@ -467,7 +462,7 @@ func (c *ClusterThrottleController) CheckThrottled(
"Threashold", thr.Status.CalculatedThreshold.Threshold,
"RequestedByPod", schedulev1alpha1.ResourceAmountOfPod(pod),
"UsedInClusterThrottle", thr.Status.Used,
"UsedInPodGroup", usedInGroup,
"ReqeustedByPodGroup", requestedByGroup,
"ReservedAmountInScheduler", reservedAmt,
"ReservedPodsInScheduler", strings.Join(sets.List(reservedPodNNs), ","),
"AmountForCheck", schedulev1alpha1.ResourceAmount{}.Add(thr.Status.Used).Add(schedulev1alpha1.ResourceAmountOfPod(pod)).Add(reservedAmt),
Expand Down
84 changes: 47 additions & 37 deletions test/integration/clusterthrottle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,49 +162,59 @@ var _ = Describe("Clusterthrottle Test", func() {
Consistently(PodIsNotScheduled(ctx, DefaultNs, pod.Name)).Should(Succeed())
})
})
Context("Pod Group", func() {
var (
podPassedGroup []*corev1.Pod
podThrottledGroup []*corev1.Pod
})

When("Group Pods", func() {
var (
podPassedGroup []*corev1.Pod
thr *v1alpha1.ClusterThrottle
podThrottledGroup []*corev1.Pod
)
BeforeEach(func() {
thr = MustCreateClusterThrottle(ctx,
MakeClusterThrottle(throttleName).Selector(DefaultNs, throttleKey, throttleName).
ThresholdPod(4).
ThresholdCpu("4").
Obj(),
)
BeforeEach(func() {
for i := 0; i < 2; i++ {
podPassedGroup = append(podPassedGroup, MustCreatePod(ctx, MakePod(DefaultNs, fmt.Sprintf("passed-pod%d", i), "50m").Annotation(groupNameAnnotation, "passed").Label(throttleKey, throttleName).Obj()))
}
for i := 0; i < 3; i++ {
podThrottledGroup = append(podThrottledGroup, MustCreatePod(ctx, MakePod(DefaultNs, fmt.Sprintf("throttled-pod%d", i), "50m").Annotation(groupNameAnnotation, "throttled").Label(throttleKey, throttleName).Obj()))
}
})
It("should not schedule pod3", func() {
Eventually(AsyncAll(
WakeupBackoffPod(ctx),
ClusterThottleHasStatus(
ctx, thr.Name,
ClthrOpts.WithCalculatedThreshold(thr.Spec.Threshold),
ClthrOpts.WithPodThrottled(true),
ClthrOpts.WithUsedPod(len(podPassedGroup)),
ClthrOpts.WithUsedCpuReq(fmt.Sprintf("%dm", len(podPassedGroup)*50)),
ClthrOpts.WithCpuThrottled(false),
),
func(g types.Gomega) {
for _, pod := range podPassedGroup {
PodIsScheduled(ctx, DefaultNs, pod.Name)
}
},
func(g types.Gomega) {
for _, pod := range podThrottledGroup {
MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup)
}
},
)).Should(Succeed())
Consistently(func(g types.Gomega) {

for i := 0; i < 2; i++ {
podPassedGroup = append(podPassedGroup, MustCreatePod(ctx, MakePod(DefaultNs, fmt.Sprintf("passed-pod%d", i), "100m").Annotation(groupNameAnnotation, "passed").Label(throttleKey, throttleName).Obj()))
}
for i := 0; i < 3; i++ {
podThrottledGroup = append(podThrottledGroup, MustCreatePod(ctx, MakePod(DefaultNs, fmt.Sprintf("throttled-pod%d", i), "100m").Annotation(groupNameAnnotation, "throttled").Label(throttleKey, throttleName).Obj()))
}
})
It("should not schedule podThrottledGroup", func() {
Eventually(AsyncAll(
WakeupBackoffPod(ctx),
ClusterThottleHasStatus(
ctx, thr.Name,
ClthrOpts.WithCalculatedThreshold(thr.Spec.Threshold),
ClthrOpts.WithUsedPod(len(podPassedGroup)),
ClthrOpts.WithUsedCpuReq(fmt.Sprintf("%dm", len(podPassedGroup)*100)),
ClthrOpts.WithPodThrottled(false),
ClthrOpts.WithCpuThrottled(false),
),
func(g types.Gomega) {
for _, pod := range podPassedGroup {
PodIsScheduled(ctx, DefaultNs, pod.Name)
}
}).Should(Succeed())
})
},
func(g types.Gomega) {
for _, pod := range podThrottledGroup {
MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup)
}
},
)).Should(Succeed())
Consistently(func(g types.Gomega) {
for _, pod := range podPassedGroup {
PodIsScheduled(ctx, DefaultNs, pod.Name)
}
}).Should(Succeed())
})
})

When("Many pods are created at once", func() {
var thr *v1alpha1.ClusterThrottle
var scheduled = make([]*corev1.Pod, 20)
Expand Down

0 comments on commit 2f48400

Please sign in to comment.