Skip to content
This repository has been archived by the owner on Oct 10, 2024. It is now read-only.

Commit

Permalink
Change the formula cat-gate uses to schedule pods (#20)
Browse files Browse the repository at this point in the history
* Change the formula cat-gate uses to schedule pods

Signed-off-by: zeroalphat <[email protected]>

* Changed the writing style where units comes after when calculating time.

Signed-off-by: zeroalphat <[email protected]>

* Change the conditional expression of numImagePullingPods

Signed-off-by: zeroalphat <[email protected]>

---------

Signed-off-by: zeroalphat <[email protected]>
Co-authored-by: Masayuki Ishii <[email protected]>
  • Loading branch information
zeroalphat and masa213f authored May 24, 2024
1 parent 67a5a1e commit 77529fb
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 44 deletions.
6 changes: 6 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cybozu-go/cat-gate/hooks"
"github.com/cybozu-go/cat-gate/internal/controller"
"github.com/cybozu-go/cat-gate/internal/indexing"
"github.com/cybozu-go/cat-gate/internal/runners"
//+kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -114,6 +115,11 @@ func main() {
}
//+kubebuilder:scaffold:builder

if err = mgr.Add(runners.GarbageCollector{}); err != nil {
setupLog.Error(err, "unable to add garbage collector")
os.Exit(1)
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ kind: ClusterRole
metadata:
name: manager-role
rules:
- apiGroups:
- ""
resources:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
2 changes: 1 addition & 1 deletion e2e/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ setup:
mkdir -p $(BIN_DIR)
$(CURL) -o $(BIN_DIR)/kubectl https://storage.googleapis.com/kubernetes-release/release/v$(E2ETEST_K8S_VERSION)/bin/linux/amd64/kubectl && chmod a+x $(BIN_DIR)/kubectl
# TODO: specify kind version
GOBIN=$(BIN_DIR) go install sigs.k8s.io/kind@latest
GOBIN=$(BIN_DIR) go install sigs.k8s.io/kind@latest

.PHONY: start
start:
Expand Down
3 changes: 3 additions & 0 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ const PodSchedulingGateName = MetaPrefix + "gate"
const CatGateImagesHashAnnotation = MetaPrefix + "images-hash"

const ImageHashAnnotationField = ".metadata.annotations.images-hash"

const LevelWarning = 1
const LevelDebug = -1
88 changes: 68 additions & 20 deletions internal/controller/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controller

import (
"context"
"slices"
"sync"
"time"

"github.com/cybozu-go/cat-gate/internal/constants"
Expand All @@ -42,14 +44,14 @@ const scaleRate = 2
// minimumCapacity the number of scheduling gates to remove when no node have the image.
const minimumCapacity = 1

const levelWarning = 1
const levelDebug = -1

var requeueSeconds = 10
var gateRemovalDelayMilliSecond = 10
var GateRemovalHistories = sync.Map{}

//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=core,resources=pods/finalizers,verbs=update
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand All @@ -62,13 +64,17 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !existsSchedulingGate(reqPod) {
return ctrl.Result{}, nil
}

if reqPod.DeletionTimestamp != nil {
return ctrl.Result{}, nil
}

annotations := reqPod.Annotations
if _, ok := annotations[constants.CatGateImagesHashAnnotation]; !ok {
logger.V(levelWarning).Info("pod annotation not found")
logger.V(constants.LevelWarning).Info("pod annotation not found")
err := r.removeSchedulingGate(ctx, reqPod)
if err != nil {
logger.Error(err, "failed to remove scheduling gate")
Expand All @@ -78,15 +84,64 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}
reqImagesHash := annotations[constants.CatGateImagesHashAnnotation]

// prevents removing the scheduling gate based on information before the cache is updated.
if value, ok := GateRemovalHistories.Load(reqImagesHash); ok {
lastGateRemovalTime := value.(time.Time)
if time.Since(lastGateRemovalTime) < time.Duration(gateRemovalDelayMilliSecond)*time.Millisecond {
logger.V(constants.LevelDebug).Info("perform retry processing to avoid race conditions", "lastGateRemovalTime", lastGateRemovalTime)
return ctrl.Result{RequeueAfter: time.Duration(gateRemovalDelayMilliSecond) * time.Millisecond}, nil
}
}

var reqImageList []string
for _, initContainer := range reqPod.Spec.InitContainers {
if initContainer.Image == "" {
continue
}
reqImageList = append(reqImageList, initContainer.Image)
}
for _, container := range reqPod.Spec.Containers {
if container.Image == "" {
continue
}
reqImageList = append(reqImageList, container.Image)
}

nodes := &corev1.NodeList{}
err = r.List(ctx, nodes)
if err != nil {
logger.Error(err, "failed to list nodes")
return ctrl.Result{}, err
}

nodeImageSet := make(map[string][]string)
for _, node := range nodes.Items {
for _, image := range node.Status.Images {
nodeImageSet[node.Name] = append(nodeImageSet[node.Name], image.Names...)
}
}

nodeSet := make(map[string]struct{})
for nodeName, images := range nodeImageSet {
allImageExists := true
for _, reqImage := range reqImageList {
if !slices.Contains(images, reqImage) {
allImageExists = false
break
}
}
if allImageExists {
nodeSet[nodeName] = struct{}{}
}
}

pods := &corev1.PodList{}
err = r.List(ctx, pods, client.MatchingFields{constants.ImageHashAnnotationField: reqImagesHash})
if err != nil {
logger.Error(err, "failed to list pods")
return ctrl.Result{}, err
}

nodeSet := make(map[string]struct{})

numSchedulablePods := 0
numImagePulledPods := 0

Expand All @@ -96,17 +151,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}
numSchedulablePods += 1

allStarted := true
statuses := pod.Status.ContainerStatuses
for _, status := range statuses {
if status.State.Running == nil && status.State.Terminated == nil {
allStarted = false
break
}
}

if allStarted && len(pod.Spec.Containers) == len(statuses) {
nodeSet[pod.Status.HostIP] = struct{}{}
if pod.Status.Phase != corev1.PodPending {
numImagePulledPods += 1
}
}
Expand All @@ -116,21 +161,24 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
if capacity < minimumCapacity {
capacity = minimumCapacity
}
logger.V(levelDebug).Info("schedule capacity", "capacity", capacity, "len(nodeSet)", len(nodeSet))
logger.V(constants.LevelDebug).Info("schedule capacity", "capacity", capacity, "len(nodeSet)", len(nodeSet))

numImagePullingPods := numSchedulablePods - numImagePulledPods
logger.V(levelDebug).Info("scheduling progress", "numSchedulablePods", numSchedulablePods, "numImagePulledPods", numImagePulledPods, "numImagePullingPods", numImagePullingPods)
logger.V(constants.LevelDebug).Info("scheduling progress", "numSchedulablePods", numSchedulablePods, "numImagePulledPods", numImagePulledPods, "numImagePullingPods", numImagePullingPods)

if capacity > numImagePullingPods {
err := r.removeSchedulingGate(ctx, reqPod)
if err != nil {
logger.Error(err, "failed to remove scheduling gate")
return ctrl.Result{}, err
}
now := time.Now()
GateRemovalHistories.Store(reqImagesHash, now)
return ctrl.Result{}, nil
}

return ctrl.Result{
RequeueAfter: time.Second * time.Duration(requeueSeconds),
RequeueAfter: time.Duration(requeueSeconds) * time.Second,
}, nil
}

Expand Down
Loading

0 comments on commit 77529fb

Please sign in to comment.