Skip to content

Commit

Permalink
feat: daemonset (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
devthejo authored Jul 1, 2024
2 parents f6ad4c8 + d0bc89d commit e78cd23
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 220 deletions.
43 changes: 43 additions & 0 deletions pkg/target/target-recommandation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package target

import (
"github.com/SocialGouv/oblik/pkg/config"
"k8s.io/apimachinery/pkg/api/resource"
vpa "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
)

type TargetRecommandation struct {
Cpu *resource.Quantity
Memory *resource.Quantity
ContainerName string
}

func getTargetRecommandations(vpaResource *vpa.VerticalPodAutoscaler, vcfg *config.VpaWorkloadCfg) []TargetRecommandation {
recommandations := []TargetRecommandation{}
if vpaResource.Status.Recommendation != nil {
for _, containerRecommendation := range vpaResource.Status.Recommendation.ContainerRecommendations {
containerName := containerRecommendation.ContainerName
recommandation := TargetRecommandation{
ContainerName: containerName,
}
switch vcfg.GetRequestCpuApplyTarget(containerName) {
case config.ApplyTargetFrugal:
recommandation.Cpu = containerRecommendation.LowerBound.Cpu()
case config.ApplyTargetBalanced:
recommandation.Cpu = containerRecommendation.Target.Cpu()
case config.ApplyTargetPeak:
recommandation.Cpu = containerRecommendation.UpperBound.Cpu()
}
switch vcfg.GetRequestMemoryApplyTarget(containerName) {
case config.ApplyTargetFrugal:
recommandation.Memory = containerRecommendation.LowerBound.Memory()
case config.ApplyTargetBalanced:
recommandation.Memory = containerRecommendation.Target.Memory()
case config.ApplyTargetPeak:
recommandation.Memory = containerRecommendation.UpperBound.Memory()
}
recommandations = append(recommandations, recommandation)
}
}
return recommandations
}
234 changes: 14 additions & 220 deletions pkg/target/target.go
Original file line number Diff line number Diff line change
@@ -1,241 +1,22 @@
package target

import (
"context"
"encoding/json"
"fmt"

"github.com/SocialGouv/oblik/pkg/calculator"
"github.com/SocialGouv/oblik/pkg/config"
"github.com/SocialGouv/oblik/pkg/reporting"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
jsonpatch "github.com/evanphx/json-patch/v5"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
vpa "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

type TargetRecommandation struct {
Cpu *resource.Quantity
Memory *resource.Quantity
ContainerName string
}

var FieldManager = "oblik-operator"

func UpdateDeployment(clientset *kubernetes.Clientset, vpa *vpa.VerticalPodAutoscaler, vcfg *config.VpaWorkloadCfg) (*reporting.UpdateResult, error) {
namespace := vpa.Namespace
targetRef := vpa.Spec.TargetRef
deploymentName := targetRef.Name
deployment, err := clientset.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Error fetching deployment: %s", err.Error())
}

update := updateContainerResources(deployment.Spec.Template.Spec.Containers, vpa, vcfg)

patchData, err := createPatch(deployment, "apps/v1", "Deployment")
if err != nil {
return nil, fmt.Errorf("Error creating patch: %s", err.Error())
}

if !vcfg.GetDryRun() {
force := true
_, err = clientset.AppsV1().Deployments(namespace).Patch(context.TODO(), deploymentName, types.ApplyPatchType, patchData, metav1.PatchOptions{
FieldManager: FieldManager,
Force: &force, // Force the apply to take ownership of the fields
})
if err != nil {
update.Type = reporting.ResultTypeFailed
update.Error = err
return nil, fmt.Errorf("Error applying patch to deployment: %s", err.Error())
}
update.Type = reporting.ResultTypeSuccess
} else {
update.Type = reporting.ResultTypeDryRun
}
return update, nil
}

func UpdateCronJob(clientset *kubernetes.Clientset, vpa *vpa.VerticalPodAutoscaler, vcfg *config.VpaWorkloadCfg) (*reporting.UpdateResult, error) {
namespace := vpa.Namespace
targetRef := vpa.Spec.TargetRef
cronjobName := targetRef.Name

cronjob, err := clientset.BatchV1().CronJobs(namespace).Get(context.TODO(), cronjobName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Error fetching cronjob: %s", err.Error())
}

update := updateContainerResources(cronjob.Spec.JobTemplate.Spec.Template.Spec.Containers, vpa, vcfg)

patchData, err := createPatch(cronjob, "batch/v1", "CronJob")
if err != nil {
return nil, fmt.Errorf("Error creating patch: %s", err.Error())
}

if !vcfg.GetDryRun() {
force := true
_, err = clientset.BatchV1().CronJobs(namespace).Patch(context.TODO(), cronjobName, types.ApplyPatchType, patchData, metav1.PatchOptions{
FieldManager: FieldManager,
Force: &force, // Force the apply to take ownership of the fields
})
if err != nil {
update.Type = reporting.ResultTypeFailed
update.Error = err
return nil, fmt.Errorf("Error applying patch to deployment: %s", err.Error())
}
update.Type = reporting.ResultTypeSuccess
} else {
update.Type = reporting.ResultTypeDryRun
}
return update, nil
}

func UpdateStatefulSet(clientset *kubernetes.Clientset, vpa *vpa.VerticalPodAutoscaler, vcfg *config.VpaWorkloadCfg) (*reporting.UpdateResult, error) {
namespace := vpa.Namespace
targetRef := vpa.Spec.TargetRef
statefulSetName := targetRef.Name

statefulSet, err := clientset.AppsV1().StatefulSets(namespace).Get(context.TODO(), statefulSetName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Error fetching stateful set: %s", err.Error())
}

update := updateContainerResources(statefulSet.Spec.Template.Spec.Containers, vpa, vcfg)

patchData, err := createPatch(statefulSet, "apps/v1", "StatefulSet")
if err != nil {
return nil, fmt.Errorf("Error creating patch: %s", err.Error())
}

if !vcfg.GetDryRun() {
force := true
_, err = clientset.AppsV1().StatefulSets(namespace).Patch(context.TODO(), statefulSetName, types.ApplyPatchType, patchData, metav1.PatchOptions{
FieldManager: FieldManager,
Force: &force, // Force the apply to take ownership of the fields
})
if err != nil {
update.Type = reporting.ResultTypeFailed
update.Error = err
return nil, fmt.Errorf("Error applying patch to statefulset: %s", err.Error())
}
update.Type = reporting.ResultTypeSuccess
} else {
update.Type = reporting.ResultTypeDryRun
}
return update, nil
}

func UpdateCluster(dynamicClient *dynamic.DynamicClient, vpa *vpa.VerticalPodAutoscaler, vcfg *config.VpaWorkloadCfg) (*reporting.UpdateResult, error) {
namespace := vpa.Namespace
targetRef := vpa.Spec.TargetRef
clusterName := targetRef.Name

gvr := schema.GroupVersionResource{
Group: "postgresql.cnpg.io",
Version: "v1",
Resource: "clusters",
}

clusterResource, err := dynamicClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), clusterName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Error fetching cluster: %s", err.Error())
}

originalClusterJSON, err := json.Marshal(clusterResource)
if err != nil {
return nil, fmt.Errorf("Error marshalling original cluster: %s", err.Error())
}

var cluster cnpgv1.Cluster
err = json.Unmarshal(originalClusterJSON, &cluster)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling cluster: %s", err.Error())
}

containers := []corev1.Container{
corev1.Container{
Name: "postgres",
Resources: cluster.Spec.Resources,
},
}
update := updateContainerResources(containers, vpa, vcfg)
cluster.Spec.Resources = containers[0].Resources

updatedClusterJSON, err := json.Marshal(cluster)
if err != nil {
return nil, fmt.Errorf("Error marshalling updated cluster: %s", err.Error())
}

patchBytes, err := jsonpatch.CreateMergePatch(originalClusterJSON, updatedClusterJSON)
if err != nil {
return nil, fmt.Errorf("Error creating patch: %s", err.Error())
}

if !vcfg.GetDryRun() {
_, err = dynamicClient.Resource(gvr).Namespace(namespace).Patch(context.TODO(), clusterName, types.MergePatchType, patchBytes, metav1.PatchOptions{
FieldManager: FieldManager,
})
if err != nil {
update.Type = reporting.ResultTypeFailed
update.Error = err
return nil, fmt.Errorf("Error applying patch to cluster: %s", err.Error())
}
update.Type = reporting.ResultTypeSuccess
} else {
update.Type = reporting.ResultTypeDryRun
}
return update, nil
}

func findContainerPolicy(vpaResource *vpa.VerticalPodAutoscaler, containerName string) *vpa.ContainerResourcePolicy {
for _, containerPolicy := range vpaResource.Spec.ResourcePolicy.ContainerPolicies {
if containerPolicy.ContainerName == containerName || containerPolicy.ContainerName == "*" {
return &containerPolicy
}
}
return nil
}

func getTargetRecommandations(vpaResource *vpa.VerticalPodAutoscaler, vcfg *config.VpaWorkloadCfg) []TargetRecommandation {
recommandations := []TargetRecommandation{}
if vpaResource.Status.Recommendation != nil {
for _, containerRecommendation := range vpaResource.Status.Recommendation.ContainerRecommendations {
containerName := containerRecommendation.ContainerName
recommandation := TargetRecommandation{
ContainerName: containerName,
}
switch vcfg.GetRequestCpuApplyTarget(containerName) {
case config.ApplyTargetFrugal:
recommandation.Cpu = containerRecommendation.LowerBound.Cpu()
case config.ApplyTargetBalanced:
recommandation.Cpu = containerRecommendation.Target.Cpu()
case config.ApplyTargetPeak:
recommandation.Cpu = containerRecommendation.UpperBound.Cpu()
}
switch vcfg.GetRequestMemoryApplyTarget(containerName) {
case config.ApplyTargetFrugal:
recommandation.Memory = containerRecommendation.LowerBound.Memory()
case config.ApplyTargetBalanced:
recommandation.Memory = containerRecommendation.Target.Memory()
case config.ApplyTargetPeak:
recommandation.Memory = containerRecommendation.UpperBound.Memory()
}
recommandations = append(recommandations, recommandation)
}
}
return recommandations
}

func setUnprovidedDefaultRecommandations(containers []corev1.Container, recommandations []TargetRecommandation, vpaResource *vpa.VerticalPodAutoscaler, vcfg *config.VpaWorkloadCfg) []TargetRecommandation {
for _, container := range containers {
containerName := container.Name
Expand Down Expand Up @@ -441,7 +222,6 @@ func updateContainerResources(containers []corev1.Container, vpaResource *vpa.Ve

func createPatch(obj interface{}, apiVersion, kind string) ([]byte, error) {
var patchedObj interface{}

switch t := obj.(type) {
case *appsv1.Deployment:
patchedObj = t.DeepCopy()
Expand All @@ -453,6 +233,11 @@ func createPatch(obj interface{}, apiVersion, kind string) ([]byte, error) {
patchedObj.(*appsv1.StatefulSet).APIVersion = apiVersion
patchedObj.(*appsv1.StatefulSet).Kind = kind
patchedObj.(*appsv1.StatefulSet).ObjectMeta.ManagedFields = nil
case *appsv1.DaemonSet:
patchedObj = t.DeepCopy()
patchedObj.(*appsv1.DaemonSet).APIVersion = apiVersion
patchedObj.(*appsv1.DaemonSet).Kind = kind
patchedObj.(*appsv1.DaemonSet).ObjectMeta.ManagedFields = nil
case *batchv1.CronJob:
patchedObj = t.DeepCopy()
patchedObj.(*batchv1.CronJob).APIVersion = apiVersion
Expand All @@ -468,3 +253,12 @@ func createPatch(obj interface{}, apiVersion, kind string) ([]byte, error) {
}
return jsonData, nil
}

func findContainerPolicy(vpaResource *vpa.VerticalPodAutoscaler, containerName string) *vpa.ContainerResourcePolicy {
for _, containerPolicy := range vpaResource.Spec.ResourcePolicy.ContainerPolicies {
if containerPolicy.ContainerName == containerName || containerPolicy.ContainerName == "*" {
return &containerPolicy
}
}
return nil
}
80 changes: 80 additions & 0 deletions pkg/target/update-cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package target

import (
"context"
"encoding/json"
"fmt"

"github.com/SocialGouv/oblik/pkg/config"
"github.com/SocialGouv/oblik/pkg/reporting"
cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
jsonpatch "github.com/evanphx/json-patch/v5"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
vpa "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"k8s.io/client-go/dynamic"
)

func UpdateCluster(dynamicClient *dynamic.DynamicClient, vpa *vpa.VerticalPodAutoscaler, vcfg *config.VpaWorkloadCfg) (*reporting.UpdateResult, error) {
namespace := vpa.Namespace
targetRef := vpa.Spec.TargetRef
clusterName := targetRef.Name

gvr := schema.GroupVersionResource{
Group: "postgresql.cnpg.io",
Version: "v1",
Resource: "clusters",
}

clusterResource, err := dynamicClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), clusterName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Error fetching cluster: %s", err.Error())
}

originalClusterJSON, err := json.Marshal(clusterResource)
if err != nil {
return nil, fmt.Errorf("Error marshalling original cluster: %s", err.Error())
}

var cluster cnpgv1.Cluster
err = json.Unmarshal(originalClusterJSON, &cluster)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling cluster: %s", err.Error())
}

containers := []corev1.Container{
corev1.Container{
Name: "postgres",
Resources: cluster.Spec.Resources,
},
}
update := updateContainerResources(containers, vpa, vcfg)
cluster.Spec.Resources = containers[0].Resources

updatedClusterJSON, err := json.Marshal(cluster)
if err != nil {
return nil, fmt.Errorf("Error marshalling updated cluster: %s", err.Error())
}

patchBytes, err := jsonpatch.CreateMergePatch(originalClusterJSON, updatedClusterJSON)
if err != nil {
return nil, fmt.Errorf("Error creating patch: %s", err.Error())
}

if !vcfg.GetDryRun() {
_, err = dynamicClient.Resource(gvr).Namespace(namespace).Patch(context.TODO(), clusterName, types.MergePatchType, patchBytes, metav1.PatchOptions{
FieldManager: FieldManager,
})
if err != nil {
update.Type = reporting.ResultTypeFailed
update.Error = err
return nil, fmt.Errorf("Error applying patch to cluster: %s", err.Error())
}
update.Type = reporting.ResultTypeSuccess
} else {
update.Type = reporting.ResultTypeDryRun
}
return update, nil
}
Loading

0 comments on commit e78cd23

Please sign in to comment.