Skip to content

Commit

Permalink
Merge pull request #803 from qmhu/rr-refector
Browse files Browse the repository at this point in the history
Abandon recommendation missions
  • Loading branch information
qmhu authored Jun 6, 2023
2 parents 633444e + 7508e5e commit 92b5605
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 145 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/analytics/analytics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,10 @@ func ConvertToRecommendationRule(analytics *analysisv1alph1.Analytics) *analysis
recommendationRule.Spec.RunInterval = (time.Duration(*analytics.Spec.CompletionStrategy.PeriodSeconds) * time.Second).String()
}

recommendationRule.Status = analysisv1alph1.RecommendationRuleStatus{
/*recommendationRule.Status = analysisv1alph1.RecommendationRuleStatus{
LastUpdateTime: analytics.Status.LastUpdateTime,
Recommendations: analytics.Status.Recommendations,
}
}*/

return recommendationRule
}
Expand Down
260 changes: 134 additions & 126 deletions pkg/controller/recommendation/recommendation_rule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package recommendation
import (
"context"
"fmt"
"reflect"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -121,40 +121,6 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen
return false
}

timeNow := metav1.Now()

// if the first mission start time is last round, reset currMissions here
currMissions := newStatus.Recommendations
if currMissions != nil && len(currMissions) > 0 {
firstMissionStartTime := currMissions[0].LastStartTime
if firstMissionStartTime.IsZero() {
currMissions = nil
} else {
planingTime := firstMissionStartTime.Add(interval)
if time.Now().After(planingTime) {
currMissions = nil // reset missions to trigger creation for missions
}
}
}

if currMissions == nil {
// create recommendation rule missions for this round
// every recommendation rule have multi recommender for one identity
for _, id := range identities {
for _, recommender := range recommendationRule.Spec.Recommenders {
currMissions = append(currMissions, analysisv1alph1.RecommendationMission{
TargetRef: id.GetObjectReference(),
RecommenderRef: analysisv1alph1.Recommender{
Name: recommender.Name,
},
})
}
}

// +1 for runNumber
newStatus.RunNumber = newStatus.RunNumber + 1
}

var currRecommendations analysisv1alph1.RecommendationList
opts := []client.ListOption{
client.MatchingLabels(map[string]string{known.RecommendationRuleUidLabel: string(recommendationRule.UID)}),
Expand All @@ -174,20 +140,50 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen
return false
}

if klog.V(6).Enabled() {
// Print identities
for k, id := range identities {
klog.V(6).InfoS("identities", "RecommendationRule", klog.KObj(recommendationRule), "key", k, "apiVersion", id.APIVersion, "kind", id.Kind, "namespace", id.Namespace, "name", id.Name)
var identitiesArray []ObjectIdentity
keys := make([]string, 0, len(identities))
for k := range identities {
keys = append(keys, k)
}
sort.Strings(keys) // sort key to get a certain order
for _, key := range keys {
id := identities[key]
id.Recommendation = GetRecommendationFromIdentity(identities[key], currRecommendations)
identitiesArray = append(identitiesArray, id)
}

timeNow := metav1.Now()
newRound := false
if len(identitiesArray) > 0 {
firstRecommendation := identitiesArray[0].Recommendation
firstMissionStartTime, err := utils.GetLastStartTime(firstRecommendation)
if err != nil {
newRound = true
} else {
planingTime := firstMissionStartTime.Add(interval)
now := utils.NowUTC()
if now.After(planingTime) {
newRound = true
}
}
}

if newRound {
// +1 for runNumber
newStatus.RunNumber = newStatus.RunNumber + 1
}

maxConcurrency := 10
executionIndex := -1
var concurrency int
for index, mission := range currMissions {
if mission.LastStartTime != nil {
continue
for index, identity := range identitiesArray {
if identity.Recommendation != nil {
runNumber, _ := utils.GetRunNumber(identity.Recommendation)
if runNumber >= newStatus.RunNumber {
continue
}
}

if executionIndex == -1 {
executionIndex = index
}
Expand All @@ -198,22 +194,17 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen

wg := sync.WaitGroup{}
wg.Add(concurrency)
for index := executionIndex; index < len(currMissions) && index < concurrency+executionIndex; index++ {
var existingRecommendation *analysisv1alph1.Recommendation
for _, r := range currRecommendations.Items {
if reflect.DeepEqual(currMissions[index].TargetRef, r.Spec.TargetRef) && string(r.Spec.Type) == currMissions[index].RecommenderRef.Name {
existingRecommendation = &r
break
}
for index := executionIndex; index < len(identitiesArray) && index < concurrency+executionIndex; index++ {
if klog.V(6).Enabled() {
klog.V(6).InfoS("execute identities", "RecommendationRule", klog.KObj(recommendationRule), "target", identitiesArray[index].GetObjectReference())
}

go executeMission(ctx, &wg, c.RecommenderMgr, c.Provider, c.PredictorMgr, recommendationRule, identities, &currMissions[index], existingRecommendation, c.Client, c.ScaleClient, c.OOMRecorder, timeNow, newStatus.RunNumber)
go executeIdentity(ctx, &wg, c.RecommenderMgr, c.Provider, c.PredictorMgr, recommendationRule, identitiesArray[index], c.Client, c.ScaleClient, c.OOMRecorder, timeNow, newStatus.RunNumber)
}

wg.Wait()

finished := false
if executionIndex+concurrency == len(currMissions) || len(currMissions) == 0 {
if executionIndex+concurrency == len(identitiesArray) || len(identitiesArray) == 0 {
finished = true
}

Expand All @@ -222,8 +213,8 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen
// clean orphan recommendations
for _, recommendation := range currRecommendations.Items {
exist := false
for _, mission := range currMissions {
if recommendation.UID == mission.UID {
for _, id := range identitiesArray {
if recommendation.UID == id.Recommendation.UID {
exist = true
break
}
Expand All @@ -241,8 +232,6 @@ func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommen

}

newStatus.Recommendations = currMissions

updateRecommendationRuleStatus(ctx, c.Client, c.Recorder, recommendationRule, newStatus)
return finished
}
Expand Down Expand Up @@ -303,15 +292,18 @@ func (c *RecommendationRuleController) getIdentities(ctx context.Context, recomm
}

for i := range filterdUnstructureds {
k := objRefKey(rs.Kind, rs.APIVersion, filterdUnstructureds[i].GetNamespace(), filterdUnstructureds[i].GetName())
if _, exists := identities[k]; !exists {
identities[k] = ObjectIdentity{
Namespace: filterdUnstructureds[i].GetNamespace(),
Name: filterdUnstructureds[i].GetName(),
Kind: rs.Kind,
APIVersion: rs.APIVersion,
Labels: filterdUnstructureds[i].GetLabels(),
Object: filterdUnstructureds[i],
for _, recommender := range recommendationRule.Spec.Recommenders {
k := objRefKey(rs.Kind, rs.APIVersion, filterdUnstructureds[i].GetNamespace(), filterdUnstructureds[i].GetName(), recommender.Name)
if _, exists := identities[k]; !exists {
identities[k] = ObjectIdentity{
Namespace: filterdUnstructureds[i].GetNamespace(),
Name: filterdUnstructureds[i].GetName(),
Kind: rs.Kind,
APIVersion: rs.APIVersion,
Labels: filterdUnstructureds[i].GetLabels(),
Object: filterdUnstructureds[i],
Recommender: recommender.Name,
}
}
}
}
Expand Down Expand Up @@ -351,12 +343,14 @@ func updateRecommendationRuleStatus(ctx context.Context, c client.Client, record
}

type ObjectIdentity struct {
Namespace string
APIVersion string
Kind string
Name string
Labels map[string]string
Object unstructuredv1.Unstructured
Namespace string
APIVersion string
Kind string
Name string
Labels map[string]string
Recommender string
Object unstructuredv1.Unstructured
Recommendation *analysisv1alph1.Recommendation
}

func (id ObjectIdentity) GetObjectReference() corev1.ObjectReference {
Expand All @@ -375,8 +369,22 @@ func newOwnerRef(a *analysisv1alph1.RecommendationRule) *metav1.OwnerReference {
}
}

func objRefKey(kind, apiVersion, namespace, name string) string {
return fmt.Sprintf("%s#%s#%s#%s", kind, apiVersion, namespace, name)
func objRefKey(kind, apiVersion, namespace, name, recommender string) string {
return fmt.Sprintf("%s#%s#%s#%s#%s", kind, apiVersion, namespace, name, recommender)
}

func GetRecommendationFromIdentity(id ObjectIdentity, currRecommendations analysisv1alph1.RecommendationList) *analysisv1alph1.Recommendation {
for _, r := range currRecommendations.Items {
if id.Kind == r.Spec.TargetRef.Kind &&
id.APIVersion == r.Spec.TargetRef.APIVersion &&
id.Namespace == r.Spec.TargetRef.Namespace &&
id.Name == r.Spec.TargetRef.Name &&
id.Recommender == string(r.Spec.Type) {
return &r
}
}

return nil
}

func CreateRecommendationObject(recommendationRule *analysisv1alph1.RecommendationRule,
Expand Down Expand Up @@ -404,6 +412,11 @@ func CreateRecommendationObject(recommendationRule *analysisv1alph1.Recommendati
},
}

recommendation.Labels = generateRecommendationLabels(recommendationRule, target, id, recommenderName)
return recommendation
}

func generateRecommendationLabels(recommendationRule *analysisv1alph1.RecommendationRule, target corev1.ObjectReference, id ObjectIdentity, recommenderName string) map[string]string {
labels := map[string]string{}
labels[known.RecommendationRuleNameLabel] = recommendationRule.Name
labels[known.RecommendationRuleUidLabel] = string(recommendationRule.UID)
Expand All @@ -414,87 +427,82 @@ func CreateRecommendationObject(recommendationRule *analysisv1alph1.Recommendati
labels[known.RecommendationRuleTargetKindLabel] = target.Kind
labels[known.RecommendationRuleTargetVersionLabel] = target.GroupVersionKind().Version
labels[known.RecommendationRuleTargetNameLabel] = target.Name
labels[known.RecommendationRuleTargetNamespaceLabel] = target.Namespace
for k, v := range id.Labels {
labels[k] = v
}

recommendation.Labels = labels
return recommendation
return labels
}

func executeMission(ctx context.Context, wg *sync.WaitGroup, recommenderMgr recommender.RecommenderManager, provider providers.History, predictorMgr predictormgr.Manager,
recommendationRule *analysisv1alph1.RecommendationRule, identities map[string]ObjectIdentity, mission *analysisv1alph1.RecommendationMission,
existingRecommendation *analysisv1alph1.Recommendation, client client.Client, scaleClient scale.ScalesGetter, oomRecorder oom.Recorder, timeNow metav1.Time, currentRunNumber int32) {
func executeIdentity(ctx context.Context, wg *sync.WaitGroup, recommenderMgr recommender.RecommenderManager, provider providers.History, predictorMgr predictormgr.Manager,
recommendationRule *analysisv1alph1.RecommendationRule, id ObjectIdentity, client client.Client, scaleClient scale.ScalesGetter, oomRecorder oom.Recorder, timeNow metav1.Time, currentRunNumber int32) {
defer func() {
mission.LastStartTime = &timeNow
klog.Infof("Mission message: %s", mission.Message)
if wg != nil {
wg.Done()
}
}()
var message string

k := objRefKey(mission.TargetRef.Kind, mission.TargetRef.APIVersion, mission.TargetRef.Namespace, mission.TargetRef.Name)
if id, exist := identities[k]; !exist {
mission.Message = fmt.Sprintf("Failed to get identity, key %s. ", k)
return
recommendation := id.Recommendation
if recommendation == nil {
recommendation = CreateRecommendationObject(recommendationRule, id.GetObjectReference(), id, id.Recommender)
} else {
recommendation := existingRecommendation
if recommendation == nil {
recommendation = CreateRecommendationObject(recommendationRule, mission.TargetRef, id, mission.RecommenderRef.Name)
// update existing recommendation's labels
for k, v := range generateRecommendationLabels(recommendationRule, id.GetObjectReference(), id, id.Recommender) {
recommendation.Labels[k] = v
}
}

r, err := recommenderMgr.GetRecommenderWithRule(mission.RecommenderRef.Name, *recommendationRule)
if err != nil {
mission.Message = fmt.Sprintf("get recommender %s failed, %v", mission.RecommenderRef.Name, err)
return
}
r, err := recommenderMgr.GetRecommenderWithRule(id.Recommender, *recommendationRule)
if err != nil {
message = fmt.Sprintf("get recommender %s failed, %v", id.Recommender, err)
} else {
p := make(map[providers.DataSourceType]providers.History)
p[providers.PrometheusDataSource] = provider
identity := framework.ObjectIdentity{
Namespace: identities[k].Namespace,
Name: identities[k].Name,
Kind: identities[k].Kind,
APIVersion: identities[k].APIVersion,
Labels: identities[k].Labels,
Object: identities[k].Object,
Namespace: id.Namespace,
Name: id.Name,
Kind: id.Kind,
APIVersion: id.APIVersion,
Labels: id.Labels,
Object: id.Object,
}
recommendationContext := framework.NewRecommendationContext(ctx, identity, recommendationRule, predictorMgr, p, recommendation, client, scaleClient, oomRecorder)
err = recommender.Run(&recommendationContext, r)
if err != nil {
mission.Message = fmt.Sprintf("Failed to run recommendation flow in recommender %s: %s", r.Name(), err.Error())
return
}

recommendation.Status.LastUpdateTime = &timeNow
if recommendation.Annotations == nil {
recommendation.Annotations = map[string]string{}
message = fmt.Sprintf("Failed to run recommendation flow in recommender %s: %s", r.Name(), err.Error())
}
recommendation.Annotations[known.RunNumberAnnotation] = strconv.Itoa(int(currentRunNumber))
}

if existingRecommendation != nil {
klog.Infof("Update recommendation %s", klog.KObj(recommendation))
if err := client.Update(ctx, recommendation); err != nil {
mission.Message = fmt.Sprintf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err)
return
}
if len(message) == 0 {
message = "Success"
}

klog.Infof("Successfully to update Recommendation %s", klog.KObj(recommendation))
} else {
klog.Infof("Create recommendation %s", klog.KObj(recommendation))
if err := client.Create(ctx, recommendation); err != nil {
mission.Message = fmt.Sprintf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err)
return
}
recommendation.Status.LastUpdateTime = &timeNow
if recommendation.Annotations == nil {
recommendation.Annotations = map[string]string{}
}
recommendation.Annotations[known.RunNumberAnnotation] = strconv.Itoa(int(currentRunNumber))
recommendation.Annotations[known.MessageAnnotation] = message
utils.SetLastStartTime(recommendation)

if id.Recommendation != nil {
klog.Infof("Update recommendation %s", klog.KObj(recommendation))
if err := client.Update(ctx, recommendation); err != nil {
klog.Errorf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err)
return
}

klog.Infof("Successfully to create Recommendation %s", klog.KObj(recommendation))
klog.Infof("Successfully to update Recommendation %s", klog.KObj(recommendation))
} else {
klog.Infof("Create recommendation %s", klog.KObj(recommendation))
if err := client.Create(ctx, recommendation); err != nil {
klog.Errorf("Failed to create recommendation %s: %v", klog.KObj(recommendation), err)
return
}

mission.Message = "Success"
mission.UID = recommendation.UID
mission.Name = recommendation.Name
mission.Namespace = recommendation.Namespace
mission.Kind = recommendation.Kind
mission.APIVersion = recommendation.APIVersion
klog.Infof("Successfully to create Recommendation %s", klog.KObj(recommendation))
}
}

Expand Down
Loading

0 comments on commit 92b5605

Please sign in to comment.