Skip to content

Commit

Permalink
Bugfix: avoid ext resource less then allocated
Browse files Browse the repository at this point in the history
  • Loading branch information
payall4u committed Jan 21, 2024
1 parent 5c18026 commit bf834b1
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewAgent(ctx context.Context,

if nodeResource := utilfeature.DefaultFeatureGate.Enabled(features.CraneNodeResource); nodeResource {
tspName := agent.CreateNodeResourceTsp()
nodeResourceManager, err := resource.NewNodeResourceManager(kubeClient, nodeName, nodeResourceReserved, tspName, nodeInformer, tspInformer, stateCollector.NodeResourceChann)
nodeResourceManager, err := resource.NewNodeResourceManager(kubeClient, nodeName, nodeResourceReserved, tspName, nodeInformer, podInformer, tspInformer, stateCollector.NodeResourceChann)
if err != nil {
return agent, err
}
Expand Down
55 changes: 45 additions & 10 deletions pkg/resource/node_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package resource
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/labels"

Check failure on line 6 in pkg/resource/node_resource_manager.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
"math"
"strconv"
"time"
Expand Down Expand Up @@ -55,6 +56,9 @@ type NodeResourceManager struct {
nodeLister corelisters.NodeLister
nodeSynced cache.InformerSynced

podLister corelisters.PodLister
podSynced cache.InformerSynced

tspLister predictionlisters.TimeSeriesPredictionLister
tspSynced cache.InformerSynced

Expand All @@ -71,7 +75,7 @@ type NodeResourceManager struct {
tspName string
}

func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeResourceReserved map[string]string, tspName string, nodeInformer coreinformers.NodeInformer,
func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeResourceReserved map[string]string, tspName string, nodeInformer coreinformers.NodeInformer, podInformer coreinformers.PodInformer,
tspInformer predictionv1.TimeSeriesPredictionInformer, stateChann chan map[string][]common.TimeSeries) (*NodeResourceManager, error) {
reserveCpuPercent, err := utils.ParsePercentage(nodeResourceReserved[v1.ResourceCPU.String()])
if err != nil {
Expand All @@ -92,6 +96,8 @@ func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeRes
client: client,
nodeLister: nodeInformer.Lister(),
nodeSynced: nodeInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podSynced: podInformer.Informer().HasSynced,
tspLister: tspInformer.Lister(),
tspSynced: tspInformer.Informer().HasSynced,
recorder: recorder,
Expand All @@ -117,6 +123,7 @@ func (o *NodeResourceManager) Run(stop <-chan struct{}) {
stop,
o.tspSynced,
o.nodeSynced,
o.podSynced,
) {
return
}
Expand Down Expand Up @@ -144,7 +151,11 @@ func (o *NodeResourceManager) Run(stop <-chan struct{}) {
}

func (o *NodeResourceManager) UpdateNodeResource() {
node := o.getNode()
node, err := o.getNode()
if err != nil {
klog.ErrorS(err, "Get node failed")
return
}
if len(node.Status.Addresses) == 0 {
klog.Error("Node addresses is empty")
return
Expand All @@ -168,13 +179,33 @@ func (o *NodeResourceManager) UpdateNodeResource() {
}
}

func (o *NodeResourceManager) getNode() *v1.Node {
node, err := o.nodeLister.Get(o.nodeName)
func (o *NodeResourceManager) getNode() (*v1.Node, error) {
return o.nodeLister.Get(o.nodeName)
}

func (o *NodeResourceManager) getExtResourceAllocated(extResource string) (float64, error) {
pods, err := o.podLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to get node: %v", err)
return nil
return 0, err
}
allocated := 0.0
allocatedFromContainer := func(container *v1.Container) float64 {
return float64(container.Resources.Requests.Name(v1.ResourceName(extResource), resource.BinarySI).Value())
}
for _, pod := range pods {
if pod.Status.Phase != v1.PodRunning {
continue
}
var one = 0.0
for _, container := range pod.Spec.Containers {
one += allocatedFromContainer(&container)
}
for _, container := range pod.Spec.Containers {
one = math.Max(one, allocatedFromContainer(&container))
}
allocated += one
}
return node
return allocated, nil
}

func (o *NodeResourceManager) FindTargetNode(tsp *predictionapi.TimeSeriesPrediction, addresses []v1.NodeAddress) (bool, error) {
Expand Down Expand Up @@ -238,11 +269,15 @@ func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[string]int64 {
default:
continue
}
if nextRecommendation < 0 {
nextRecommendation = 0
extResourceName := fmt.Sprintf(utils.ExtResourcePrefixFormat, string(resourceName))
extResourceAllocated, err := o.getExtResourceAllocated(extResourceName)
if err != nil {
klog.Warningf("Get allocated ext resources %s failed: %s", extResourceName, err.Error())
}
if nextRecommendation < extResourceAllocated {
nextRecommendation = extResourceAllocated
}
metrics.UpdateNodeResourceRecommendedValue(metrics.SubComponentNodeResource, metrics.StepGetExtResourceRecommended, string(resourceName), resourceFrom, nextRecommendation)
extResourceName := fmt.Sprintf(utils.ExtResourcePrefixFormat, string(resourceName))
resValue, exists := node.Status.Capacity[v1.ResourceName(extResourceName)]
if exists && resValue.Value() != 0 &&
math.Abs(float64(resValue.Value())-
Expand Down
File renamed without changes.

0 comments on commit bf834b1

Please sign in to comment.