Skip to content

Commit

Permalink
Merge pull request #779 from lbbniu/feat/lbbniu/service
Browse files Browse the repository at this point in the history
feat: Find the Orphan Service Plugin in k8s cluster
  • Loading branch information
qmhu committed May 26, 2023
2 parents 019e79a + 63b106e commit 653f05d
Show file tree
Hide file tree
Showing 17 changed files with 375 additions and 28 deletions.
4 changes: 4 additions & 0 deletions deploy/craned/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ data:
acceptedResources:
- kind: PersistentVolume
apiVersion: v1
- name: Service
acceptedResources:
- kind: Service
apiVersion: v1
---
apiVersion: v1
kind: ConfigMap
Expand Down
8 changes: 2 additions & 6 deletions pkg/prometheus-adapter/config_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,12 @@ func (pc *PrometheusAdapterConfigFetcher) Reconcile(ctx context.Context, req ctr
klog.V(4).Infof("Got prometheus adapter configmap %s", req.NamespacedName)

//get configmap content
cm := &corev1.ConfigMap{}
err := pc.Client.Get(ctx, req.NamespacedName, cm)
var cm corev1.ConfigMap
err := pc.Client.Get(ctx, req.NamespacedName, &cm)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if cm == nil {
return ctrl.Result{}, fmt.Errorf("get configmap %s/%s failed", req.NamespacedName.Namespace, req.NamespacedName.Name)
}

cfg, err := config.FromYAML([]byte(cm.Data[pc.AdapterConfigMapKey]))
if err != nil {
klog.Errorf("Got metricsDiscoveryConfig failed[%s] %v", pc.AdapterConfigMapName, err)
Expand Down
11 changes: 9 additions & 2 deletions pkg/recommendation/framework/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,20 @@ func RetrievePods(ctx *RecommendationContext) error {
return err
} else if ctx.Recommendation.Spec.TargetRef.Kind == "DaemonSet" {
var daemonSet appsv1.DaemonSet
err := ObjectConversion(ctx.Object, &daemonSet)
if err != nil {
if err := ObjectConversion(ctx.Object, &daemonSet); err != nil {
return err
}
pods, err := utils.GetDaemonSetPods(ctx.Client, ctx.Recommendation.Spec.TargetRef.Namespace, ctx.Recommendation.Spec.TargetRef.Name)
ctx.Pods = pods
return err
} else if ctx.Recommendation.Spec.TargetRef.Kind == "Service" {
var svc corev1.Service
if err := ObjectConversion(ctx.Object, &svc); err != nil {
return err
}
pods, err := utils.GetServicePods(ctx.Client, &svc)
ctx.Pods = pods
return err
} else {
pods, err := utils.GetPodsFromScale(ctx.Client, ctx.Scale)
ctx.Pods = pods
Expand Down
1 change: 1 addition & 0 deletions pkg/recommendation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
_ "github.com/gocrane/crane/pkg/recommendation/recommender/idlenode"
_ "github.com/gocrane/crane/pkg/recommendation/recommender/replicas"
_ "github.com/gocrane/crane/pkg/recommendation/recommender/resource"
_ "github.com/gocrane/crane/pkg/recommendation/recommender/service"
)

type RecommenderManager interface {
Expand Down
3 changes: 3 additions & 0 deletions pkg/recommendation/recommender/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ const (

// VolumesRecommender name
VolumesRecommender string = "Volumes"

// ServiceRecommender name
ServiceRecommender string = "Service"
)
48 changes: 48 additions & 0 deletions pkg/recommendation/recommender/service/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package service

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/gocrane/crane/pkg/recommendation/framework"
)

// Filter out k8s resources that are not supported by the recommender.
func (s *ServiceRecommender) Filter(ctx *framework.RecommendationContext) error {
var err error

// filter resource that not match objectIdentity
if err = s.BaseRecommender.Filter(ctx); err != nil {
return err
}
var svc corev1.Service
if err = framework.ObjectConversion(ctx.Object, &svc); err != nil {
return err
}

if svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
return fmt.Errorf("service: %v type: %s is not a LoadBalancer", ctx.Object.GetName(), svc.Spec.Type)
}

// filter Endpoints not empty
var ep corev1.Endpoints
if err = ctx.Client.Get(ctx.Context, client.ObjectKeyFromObject(ctx.Object), &ep); client.IgnoreNotFound(err) != nil {
return err
}
for _, ss := range ep.Subsets {
if len(ss.Addresses) != 0 {
return fmt.Errorf("service: %v addresses: %v not empty", ctx.Object.GetName(), ss.Addresses)
}
if len(ss.NotReadyAddresses) != 0 {
return fmt.Errorf("service: %v NotReadyAddresses: %v not empty", ctx.Object.GetName(), ss.NotReadyAddresses)
}
}

if err = framework.RetrievePods(ctx); err != nil {
return err
}

return nil
}
10 changes: 10 additions & 0 deletions pkg/recommendation/recommender/service/observe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package service

import (
"github.com/gocrane/crane/pkg/recommendation/framework"
)

// Observe enhance the observability.
func (s *ServiceRecommender) Observe(ctx *framework.RecommendationContext) error {
return nil
}
87 changes: 87 additions & 0 deletions pkg/recommendation/recommender/service/prepare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package service

import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

"github.com/gocrane/crane/pkg/metricnaming"
"github.com/gocrane/crane/pkg/providers"
"github.com/gocrane/crane/pkg/recommendation/framework"
"github.com/gocrane/crane/pkg/utils"
)

const callerFormat = "ServiceRecommender-%s-%s"

// CheckDataProviders in PrePrepare phase, will create data source provider via your recommendation config.
func (s *ServiceRecommender) CheckDataProviders(ctx *framework.RecommendationContext) error {
if err := s.BaseRecommender.CheckDataProviders(ctx); err != nil {
return err
}

return nil
}

func (s *ServiceRecommender) CollectData(ctx *framework.RecommendationContext) error {
if len(ctx.Pods) == 0 {
return nil
}

var workloadRef *metav1.OwnerReference
for _, pod := range ctx.Pods {
workloadRef = utils.GetPodOwnerReference(ctx.Context, ctx.Client, &pod)
if workloadRef != nil {
break
}
}
if workloadRef == nil {
return fmt.Errorf("could not find all pod OwnerReferences for Service %s selector", ctx.Object.GetName())
}
podName := utils.GetPodNameReg(workloadRef.Name, workloadRef.Kind)

labelSelector := labels.SelectorFromSet(ctx.Identity.Labels)
caller := fmt.Sprintf(callerFormat, klog.KObj(ctx.Recommendation), ctx.Recommendation.UID)
timeNow := time.Now()
metricNamer := metricnaming.ResourceToGeneralMetricNamer(utils.GetWorkloadNetReceiveBytesExpression(podName), corev1.ResourceServices, labelSelector, caller)
if err := metricNamer.Validate(); err != nil {
return err
}
ctx.MetricNamer = metricNamer

// get pod net receive bytes
klog.Infof("%s: %s NetReceiveBytes %s", ctx.String(), s.Name(), ctx.MetricNamer.BuildUniqueKey())
tsList, err := ctx.DataProviders[providers.PrometheusDataSource].QueryTimeSeries(ctx.MetricNamer, timeNow.Add(-time.Hour*24*7), timeNow, time.Minute)
if err != nil {
return fmt.Errorf("%s query pod net receive bytes historic metrics failed: %v ", s.Name(), err)
}
if len(tsList) != 1 {
return fmt.Errorf("%s query pod net receive bytes historic metrics data is unexpected, List length is %d ", s.Name(), len(tsList))
}
ctx.AddInputValue(netReceiveBytesKey, tsList)

metricNamer = metricnaming.ResourceToGeneralMetricNamer(utils.GetWorkloadNetTransferBytesExpression(podName), corev1.ResourceServices, labelSelector, caller)
if err = metricNamer.Validate(); err != nil {
return err
}

// get pod net transfer bytes
klog.Infof("%s: %s NetTransferBytes %s", ctx.String(), s.Name(), ctx.MetricNamer.BuildUniqueKey())
tsList, err = ctx.DataProviders[providers.PrometheusDataSource].QueryTimeSeries(ctx.MetricNamer, timeNow.Add(-time.Hour*24*7), timeNow, time.Minute)
if err != nil {
return fmt.Errorf("%s query pod net transfer bytes historic metrics failed: %v ", s.Name(), err)
}
if len(tsList) != 1 {
return fmt.Errorf("%s query pod net transfer bytes historic metrics data is unexpected, List length is %d ", s.Name(), len(tsList))
}
ctx.AddInputValue(netTransferBytesKey, tsList)

return nil
}

func (s *ServiceRecommender) PostProcessing(ctx *framework.RecommendationContext) error {
return nil
}
79 changes: 79 additions & 0 deletions pkg/recommendation/recommender/service/recommend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package service

import (
"fmt"

"github.com/montanaflynn/stats"

"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/recommendation/framework"
)

func (s *ServiceRecommender) PreRecommend(ctx *framework.RecommendationContext) error {
return nil
}

func (s *ServiceRecommender) Recommend(ctx *framework.RecommendationContext) error {
if len(ctx.Pods) == 0 {
ctx.Recommendation.Status.Action = "Delete"
ctx.Recommendation.Status.Description = "It is a Orphan Service, Pod count is 0"
return nil
}

// check if pod net receive bytes lt config value
if netReceiveBytes := s.getMaxValue(s.netReceiveBytes, ctx.InputValue(netReceiveBytesKey)); netReceiveBytes > s.netReceiveBytes {
return fmt.Errorf("Service %s is not a Orphan Service, because the config value is %f, but the net receive bytes is %f ", ctx.Object.GetName(), s.netReceiveBytes, netReceiveBytes)
}

// check if pod net transfer bytes lt config value
if netTransferBytes := s.getMaxValue(s.netTransferBytes, ctx.InputValue(netTransferBytesKey)); netTransferBytes > s.netTransferBytes {
return fmt.Errorf("Service %s is not a Orphan Service, because the config value is %f, but the net transfer bytes is %f ", ctx.Object.GetName(), s.netTransferBytes, netTransferBytes)
}

// check if pod net receive percentile lt config value
if netReceivePercentile := s.getPercentile(s.netTransferBytes, ctx.InputValue(netReceiveBytesKey)); netReceivePercentile > s.netReceivePercentile {
return fmt.Errorf("Service %s is not a Orphan Service, because the config value is %f, but the net receive percentile is %f ", ctx.Object.GetName(), s.netReceivePercentile, netReceivePercentile)
}

// check if pod net transfer percentile lt config value
if netTransferPercentile := s.getPercentile(s.netTransferBytes, ctx.InputValue(netTransferBytesKey)); netTransferPercentile > s.netTransferPercentile {
return fmt.Errorf("Service %s is not a Orphan Service, because the config value is %f, but the net transfer percentile is %f ", ctx.Object.GetName(), s.netTransferPercentile, netTransferPercentile)
}

ctx.Recommendation.Status.Action = "Delete"
ctx.Recommendation.Status.Description = "It is a Orphan Service, Pod net bytes low"
return nil
}

// Policy add some logic for result of recommend phase.
func (s *ServiceRecommender) Policy(ctx *framework.RecommendationContext) error {
return nil
}

func (s *ServiceRecommender) getMaxValue(configValue float64, ts []*common.TimeSeries) float64 {
if configValue == 0 {
return configValue
}
var maxValue float64
for _, ss := range ts[0].Samples {
if ss.Value > maxValue {
maxValue = ss.Value
}
}
return maxValue
}

func (s *ServiceRecommender) getPercentile(configValue float64, ts []*common.TimeSeries) float64 {
if configValue == 0 {
return configValue
}
var input stats.Float64Data
for _, ss := range ts[0].Samples {
input = append(input, ss.Value)
}
percentile, err := stats.Percentile(input, configValue)
if err != nil {
return configValue
}
return percentile
}
67 changes: 67 additions & 0 deletions pkg/recommendation/recommender/service/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package service

import (
analysisv1alph1 "github.com/gocrane/api/analysis/v1alpha1"
"github.com/gocrane/crane/pkg/recommendation/config"
"github.com/gocrane/crane/pkg/recommendation/recommender"
"github.com/gocrane/crane/pkg/recommendation/recommender/apis"
"github.com/gocrane/crane/pkg/recommendation/recommender/base"
)

const (
netReceiveBytesKey = "net-receive-bytes"
netTransferBytesKey = "net-transfer-bytes"
netReceivePercentileKey = "net-receive-percentile"
netTransferPercentileKey = "net-transfer-percentile"
)

var _ recommender.Recommender = &ServiceRecommender{}

type ServiceRecommender struct {
base.BaseRecommender
netReceiveBytes float64
netTransferBytes float64
netReceivePercentile float64
netTransferPercentile float64
}

func init() {
recommender.RegisterRecommenderProvider(recommender.ServiceRecommender, NewServiceRecommender)
}

func (s *ServiceRecommender) Name() string {
return recommender.ServiceRecommender
}

// NewServiceRecommender create a new service recommender.
func NewServiceRecommender(recommender apis.Recommender, recommendationRule analysisv1alph1.RecommendationRule) (recommender.Recommender, error) {
recommender = config.MergeRecommenderConfigFromRule(recommender, recommendationRule)

netReceiveBytes, err := recommender.GetConfigFloat(netReceiveBytesKey, 0)
if err != nil {
return nil, err
}

netTransferBytes, err := recommender.GetConfigFloat(netTransferBytesKey, 0)
if err != nil {
return nil, err
}

netReceivePercentile, err := recommender.GetConfigFloat(netReceivePercentileKey, 0)
if err != nil {
return nil, err
}

netTransferPercentile, err := recommender.GetConfigFloat(netTransferPercentileKey, 0)
if err != nil {
return nil, err
}

return &ServiceRecommender{
*base.NewBaseRecommender(recommender),
netReceiveBytes,
netTransferBytes,
netReceivePercentile,
netTransferPercentile,
}, nil
}
2 changes: 1 addition & 1 deletion pkg/recommendation/recommender/volumes/observe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import (
)

// Observe enhance the observability.
func (s *VolumesRecommender) Observe(ctx *framework.RecommendationContext) error {
func (vr *VolumesRecommender) Observe(ctx *framework.RecommendationContext) error {
return nil
}
8 changes: 4 additions & 4 deletions pkg/recommendation/recommender/volumes/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ import (
)

// CheckDataProviders in PrePrepare phase, will create data source provider via your recommendation config.
func (rr *VolumesRecommender) CheckDataProviders(ctx *framework.RecommendationContext) error {
if err := rr.BaseRecommender.CheckDataProviders(ctx); err != nil {
func (vr *VolumesRecommender) CheckDataProviders(ctx *framework.RecommendationContext) error {
if err := vr.BaseRecommender.CheckDataProviders(ctx); err != nil {
return err
}

return nil
}

func (rr *VolumesRecommender) CollectData(ctx *framework.RecommendationContext) error {
func (vr *VolumesRecommender) CollectData(ctx *framework.RecommendationContext) error {
return nil
}

func (rr *VolumesRecommender) PostProcessing(ctx *framework.RecommendationContext) error {
func (vr *VolumesRecommender) PostProcessing(ctx *framework.RecommendationContext) error {
return nil
}
Loading

0 comments on commit 653f05d

Please sign in to comment.