Skip to content

Commit

Permalink
feat: Find the Orphan Service Plugin in k8s cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
lbbniu committed May 16, 2023
1 parent e5bcb70 commit 466e86a
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 18 deletions.
4 changes: 4 additions & 0 deletions deploy/craned/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ data:
acceptedResources:
- kind: Node
apiVersion: v1
- name: Service
acceptedResources:
- kind: Service
apiVersion: v1
---
apiVersion: v1
kind: ConfigMap
Expand Down
8 changes: 2 additions & 6 deletions pkg/prometheus-adapter/config_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,12 @@ func (pc *PrometheusAdapterConfigFetcher) Reconcile(ctx context.Context, req ctr
klog.V(4).Infof("Got prometheus adapter configmap %s", req.NamespacedName)

//get configmap content
cm := &corev1.ConfigMap{}
err := pc.Client.Get(ctx, req.NamespacedName, cm)
var cm corev1.ConfigMap
err := pc.Client.Get(ctx, req.NamespacedName, &cm)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if cm == nil {
return ctrl.Result{}, fmt.Errorf("get configmap %s/%s failed", req.NamespacedName.Namespace, req.NamespacedName.Name)
}

cfg, err := config.FromYAML([]byte(cm.Data[pc.AdapterConfigMapKey]))
if err != nil {
klog.Errorf("Got metricsDiscoveryConfig failed[%s] %v", pc.AdapterConfigMapName, err)
Expand Down
11 changes: 9 additions & 2 deletions pkg/recommendation/framework/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,20 @@ func RetrievePods(ctx *RecommendationContext) error {
return err
} else if ctx.Recommendation.Spec.TargetRef.Kind == "DaemonSet" {
var daemonSet appsv1.DaemonSet
err := ObjectConversion(ctx.Object, &daemonSet)
if err != nil {
if err := ObjectConversion(ctx.Object, &daemonSet); err != nil {
return err
}
pods, err := utils.GetDaemonSetPods(ctx.Client, ctx.Recommendation.Spec.TargetRef.Namespace, ctx.Recommendation.Spec.TargetRef.Name)
ctx.Pods = pods
return err
} else if ctx.Recommendation.Spec.TargetRef.Kind == "Service" {
var svc corev1.Service
if err := ObjectConversion(ctx.Object, &svc); err != nil {
return err
}
pods, err := utils.GetServicePods(ctx.Client, &svc)
ctx.Pods = pods
return err
} else {
pods, err := utils.GetPodsFromScale(ctx.Client, ctx.Scale)
ctx.Pods = pods
Expand Down
1 change: 1 addition & 0 deletions pkg/recommendation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
_ "github.com/gocrane/crane/pkg/recommendation/recommender/idlenode"
_ "github.com/gocrane/crane/pkg/recommendation/recommender/replicas"
_ "github.com/gocrane/crane/pkg/recommendation/recommender/resource"
_ "github.com/gocrane/crane/pkg/recommendation/recommender/service"
)

type RecommenderManager interface {
Expand Down
3 changes: 3 additions & 0 deletions pkg/recommendation/recommender/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ const (

// IdleNodeRecommender name
IdleNodeRecommender string = "IdleNode"

// ServiceRecommender name
ServiceRecommender string = "Service"
)
48 changes: 48 additions & 0 deletions pkg/recommendation/recommender/service/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package service

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/gocrane/crane/pkg/recommendation/framework"
)

// Filter out k8s resources that are not supported by the recommender.
func (s *ServiceRecommender) Filter(ctx *framework.RecommendationContext) error {
var err error

// filter resource that not match objectIdentity
if err = s.BaseRecommender.Filter(ctx); err != nil {
return err
}
var svc corev1.Service
if err = framework.ObjectConversion(ctx.Object, &svc); err != nil {
return err
}

if svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
return fmt.Errorf("service: %v type: %s is not a LoadBalancer", ctx.Object.GetName(), svc.Spec.Type)
}

// filter Endpoints not empty
var ep corev1.Endpoints
if err = ctx.Client.Get(ctx.Context, client.ObjectKeyFromObject(ctx.Object), &ep); client.IgnoreNotFound(err) != nil {
return err
}
for _, ss := range ep.Subsets {
if len(ss.Addresses) != 0 {
return fmt.Errorf("service: %v addresses: %v not empty", ctx.Object.GetName(), ss.Addresses)
}
if len(ss.NotReadyAddresses) != 0 {
return fmt.Errorf("service: %v NotReadyAddresses: %v not empty", ctx.Object.GetName(), ss.NotReadyAddresses)
}
}

if err = framework.RetrievePods(ctx); err != nil {
return err
}

return nil
}
10 changes: 10 additions & 0 deletions pkg/recommendation/recommender/service/observe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package service

import (
"github.com/gocrane/crane/pkg/recommendation/framework"
)

// Observe enhance the observability.
func (s *ServiceRecommender) Observe(ctx *framework.RecommendationContext) error {
return nil
}
87 changes: 87 additions & 0 deletions pkg/recommendation/recommender/service/prepare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package service

import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

"github.com/gocrane/crane/pkg/metricnaming"
"github.com/gocrane/crane/pkg/providers"
"github.com/gocrane/crane/pkg/recommendation/framework"
"github.com/gocrane/crane/pkg/utils"
)

const callerFormat = "ServiceRecommender-%s-%s"

// CheckDataProviders in PrePrepare phase, will create data source provider via your recommendation config.
func (s *ServiceRecommender) CheckDataProviders(ctx *framework.RecommendationContext) error {
if err := s.BaseRecommender.CheckDataProviders(ctx); err != nil {
return err
}

return nil
}

func (s *ServiceRecommender) CollectData(ctx *framework.RecommendationContext) error {
if len(ctx.Pods) == 0 {
return nil
}

var workloadRef *metav1.OwnerReference
for _, pod := range ctx.Pods {
workloadRef = utils.GetPodOwnerReference(ctx.Context, ctx.Client, &pod)
if workloadRef != nil {
break
}
}
if workloadRef == nil {
return fmt.Errorf("could not find all pod OwnerReferences for Service %s selector", ctx.Object.GetName())
}
podName := utils.GetPodNameReg(workloadRef.Name, workloadRef.Kind)

labelSelector := labels.SelectorFromSet(ctx.Identity.Labels)
caller := fmt.Sprintf(callerFormat, klog.KObj(ctx.Recommendation), ctx.Recommendation.UID)
timeNow := time.Now()
metricNamer := metricnaming.ResourceToGeneralMetricNamer(utils.GetWorkloadNetReceiveBytesExpression(podName), corev1.ResourceServices, labelSelector, caller)
if err := metricNamer.Validate(); err != nil {
return err
}
ctx.MetricNamer = metricNamer

// get pod net receive bytes
klog.Infof("%s: %s NetReceiveBytes %s", ctx.String(), s.Name(), ctx.MetricNamer.BuildUniqueKey())
tsList, err := ctx.DataProviders[providers.PrometheusDataSource].QueryTimeSeries(ctx.MetricNamer, timeNow.Add(-time.Hour*24*7), timeNow, time.Minute)
if err != nil {
return fmt.Errorf("%s query pod net receive bytes historic metrics failed: %v ", s.Name(), err)
}
if len(tsList) != 1 {
return fmt.Errorf("%s query pod net receive bytes historic metrics data is unexpected, List length is %d ", s.Name(), len(tsList))
}
ctx.AddInputValue(netReceiveBytesKey, tsList)

metricNamer = metricnaming.ResourceToGeneralMetricNamer(utils.GetWorkloadNetTransferBytesExpression(podName), corev1.ResourceServices, labelSelector, caller)
if err = metricNamer.Validate(); err != nil {
return err
}

// get pod net transfer bytes
klog.Infof("%s: %s NetTransferBytes %s", ctx.String(), s.Name(), ctx.MetricNamer.BuildUniqueKey())
tsList, err = ctx.DataProviders[providers.PrometheusDataSource].QueryTimeSeries(ctx.MetricNamer, timeNow.Add(-time.Hour*24*7), timeNow, time.Minute)
if err != nil {
return fmt.Errorf("%s query pod net transfer bytes historic metrics failed: %v ", s.Name(), err)
}
if len(tsList) != 1 {
return fmt.Errorf("%s query pod net transfer bytes historic metrics data is unexpected, List length is %d ", s.Name(), len(tsList))
}
ctx.AddInputValue(netTransferBytesKey, tsList)

return nil
}

func (s *ServiceRecommender) PostProcessing(ctx *framework.RecommendationContext) error {
return nil
}
52 changes: 52 additions & 0 deletions pkg/recommendation/recommender/service/recommend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package service

import (
"fmt"

"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/recommendation/framework"
)

func (s *ServiceRecommender) PreRecommend(ctx *framework.RecommendationContext) error {
return nil
}

func (s *ServiceRecommender) Recommend(ctx *framework.RecommendationContext) error {
if len(ctx.Pods) == 0 {
ctx.Recommendation.Status.Action = "Delete"
ctx.Recommendation.Status.Description = "It is a Orphan Service, Pod count is 0"
return nil
}

// check if pod net receive bytes lt config value
if netReceiveBytes := s.getMaxValue(s.netReceiveBytes, ctx.InputValue(netReceiveBytesKey)); netReceiveBytes > s.netReceiveBytes {
return fmt.Errorf("Service %s is not a Orphan Service, because the config value is %f, but the pod net receive bytes is %f ", ctx.Object.GetName(), s.netReceiveBytes, netReceiveBytes)
}

// check if pod net transfer bytes lt config value
if netTransferBytes := s.getMaxValue(s.netTransferBytes, ctx.InputValue(netTransferBytesKey)); netTransferBytes > s.netTransferBytes {
return fmt.Errorf("Service %s is not a Orphan Service, because the config value is %f, but the pod net transfer bytes is %f ", ctx.Object.GetName(), s.netTransferBytes, netTransferBytes)
}

ctx.Recommendation.Status.Action = "Delete"
ctx.Recommendation.Status.Description = "It is a Orphan Service, Pod net bytes low"
return nil
}

// Policy add some logic for result of recommend phase.
func (s *ServiceRecommender) Policy(ctx *framework.RecommendationContext) error {
return nil
}

func (s *ServiceRecommender) getMaxValue(configValue float64, ts []*common.TimeSeries) float64 {
if configValue == 0 {
return configValue
}
var maxValue float64
for _, ss := range ts[0].Samples {
if ss.Value > maxValue {
maxValue = ss.Value
}
}
return maxValue
}
51 changes: 51 additions & 0 deletions pkg/recommendation/recommender/service/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package service

import (
analysisv1alph1 "github.com/gocrane/api/analysis/v1alpha1"
"github.com/gocrane/crane/pkg/recommendation/config"
"github.com/gocrane/crane/pkg/recommendation/recommender"
"github.com/gocrane/crane/pkg/recommendation/recommender/apis"
"github.com/gocrane/crane/pkg/recommendation/recommender/base"
)

const (
netReceiveBytesKey = "pod-net-receive-bytes"
netTransferBytesKey = "pod-net-transfer-bytes"
)

var _ recommender.Recommender = &ServiceRecommender{}

type ServiceRecommender struct {
base.BaseRecommender
netReceiveBytes float64
netTransferBytes float64
}

func init() {
recommender.RegisterRecommenderProvider(recommender.ServiceRecommender, NewServiceRecommender)
}

func (s *ServiceRecommender) Name() string {
return recommender.ServiceRecommender
}

// NewServiceRecommender create a new service recommender.
func NewServiceRecommender(recommender apis.Recommender, recommendationRule analysisv1alph1.RecommendationRule) (recommender.Recommender, error) {
recommender = config.MergeRecommenderConfigFromRule(recommender, recommendationRule)

netReceiveBytes, err := recommender.GetConfigFloat(netReceiveBytesKey, 0)
if err != nil {
return nil, err
}

netTransferBytes, err := recommender.GetConfigFloat(netTransferBytesKey, 0)
if err != nil {
return nil, err
}

return &ServiceRecommender{
*base.NewBaseRecommender(recommender),
netReceiveBytes,
netTransferBytes,
}, nil
}
16 changes: 16 additions & 0 deletions pkg/utils/expression_prom_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,24 @@ const (
ContainerMemUsageExprTemplate = `container_memory_working_set_bytes{container!="POD",namespace="%s",pod=~"%s",container="%s"}`

CustomerExprTemplate = `sum(%s{%s})`

// 容器网络累积接收数据总量(单位:字节)
queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod=~"%s"}[1h])) by (namespace)`
// 容器网络累积传输数据总量(单位:字节)
queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod=~"%s"}[1h])) by (namespace)`
)

const (
PostRegMatchesPodDeployment = `[a-z0-9]+-[a-z0-9]{5}$`
PostRegMatchesPodReplicaset = `[a-z0-9]+$`
PostRegMatchesPodDaemonSet = `[a-z0-9]{5}$`
PostRegMatchesPodStatefulset = `[0-9]+$`
)

func GetPodNameReg(resourceName string, resourceType string) string {
switch resourceType {
case "DaemonSet":
return fmt.Sprintf("^%s-%s", resourceName, PostRegMatchesPodDaemonSet)
case "ReplicaSet":
return fmt.Sprintf("^%s-%s", resourceName, PostRegMatchesPodReplicaset)
case "Deployment":
Expand Down Expand Up @@ -108,3 +116,11 @@ func GetNodeCpuUsageUtilizationExpression(nodeName string) string {
func GetNodeMemUsageUtilizationExpression(nodeName string) string {
return fmt.Sprintf(NodeMemUsageUtilizationExprTemplate, nodeName, nodeName)
}

func GetWorkloadNetReceiveBytesExpression(podName string) string {
return fmt.Sprintf(queryFmtNetReceiveBytes, podName)
}

func GetWorkloadNetTransferBytesExpression(podName string) string {
return fmt.Sprintf(queryFmtNetTransferBytes, podName)
}
Loading

0 comments on commit 466e86a

Please sign in to comment.