Skip to content

Commit

Permalink
Merge pull request #53 from lwolf/51-fix-pending
Browse files Browse the repository at this point in the history
  • Loading branch information
lwolf authored Jul 4, 2020
2 parents 261a923 + 3302bc1 commit 6436dfd
Show file tree
Hide file tree
Showing 8 changed files with 495 additions and 162 deletions.
26 changes: 16 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ go:
services:
- docker

before_deploy:
- echo "$QUAY_PASSWORD" | docker login -u "$QUAY_LOGIN" quay.io --password-stdin

deploy:
- provider: script
skip_cleanup: true
script: curl -sL http://git.io/goreleaser | bash
on:
tags: true
condition: $TRAVIS_OS_NAME = linux
jobs:
include:
- stage: Test
script: make test
after_success:
- bash <(curl -s https://codecov.io/bash)
- stage: Release
before_script:
- echo "$QUAY_PASSWORD" | docker login -u "$QUAY_LOGIN" quay.io --password-stdin
deploy:
- provider: script
skip_cleanup: true
script: curl -sL http://git.io/goreleaser | bash
on:
tags: true
condition: $TRAVIS_OS_NAME = linux
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ NAME := kube-cleanup-operator
AUTHOR=lwolf
VERSION ?= 0.6.0
REGISTRY ?= quay.io
GIT_SHA=$(shell git --no-pager describe --always --dirty)
GIT_SHA=$(shell git rev-list --count HEAD)-$(shell git rev-parse --short=7 HEAD)
COMMIT_TIME=$(shell git show --format=%ct --no-patch)
LFLAGS ?= -X main.gitsha=${GIT_SHA} -X main.committed=${COMMIT_TIME}
ROOT_DIR=${PWD}
GOVERSION ?= 1.14.0
HARDWARE=$(shell uname -m)

.PHONY: build docker static release install_deps
.PHONY: build docker static release install_deps test

default: build

Expand All @@ -26,6 +26,9 @@ build: golang
@mkdir -p bin
go build -mod=vendor -ldflags "${LFLAGS}" -o bin/$(NAME) ./cmd

test:
go test -race ./pkg/... -coverprofile=coverage.txt -covermode=atomic

static: golang
@echo "--> Compiling the static binary"
@mkdir -p bin
Expand Down
10 changes: 9 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"github.com/lwolf/kube-cleanup-operator/pkg/controller"
)

var (
gitsha string
committed string
)

func setupLogging() {
// Set logging output to standard console out
log.SetOutput(os.Stdout)
Expand All @@ -45,6 +50,7 @@ func main() {
deleteOrphanedAfter := flag.Duration("delete-orphaned-pods-after", 1*time.Hour, "Delete orphaned pods. Pods without an owner in non-running state (golang duration format, e.g 5m), 0 - never delete")
deleteEvictedAfter := flag.Duration("delete-evicted-pods-after", 15*time.Minute, "Delete pods in evicted state (golang duration format, e.g 5m), 0 - never delete")
deletePendingAfter := flag.Duration("delete-pending-pods-after", 0, "Delete pods in pending state after X duration (golang duration format, e.g 5m), 0 - never delete")
ignoreOwnedByCronjob := flag.Bool("ignore-owned-by-cronjobs", false, "[EXPERIMENTAL] Do not cleanup pods and jobs created by cronjobs")

legacyKeepSuccessHours := flag.Int64("keep-successful", 0, "Number of hours to keep successful jobs, -1 - forever, 0 - never (default), >0 number of hours")
legacyKeepFailedHours := flag.Int64("keep-failures", -1, "Number of hours to keep failed jobs, -1 - forever (default) 0 - never, >0 number of hours")
Expand All @@ -55,7 +61,7 @@ func main() {
flag.Parse()
setupLogging()

log.Println("Starting the application.")
log.Printf("Starting the application. Version: %s, CommitTime: %s\n", gitsha, committed)
var optsInfo strings.Builder
optsInfo.WriteString("Provided options: \n")
optsInfo.WriteString(fmt.Sprintf("\tnamespace: %s\n", *namespace))
Expand All @@ -65,6 +71,7 @@ func main() {
optsInfo.WriteString(fmt.Sprintf("\tdelete-pending-after: %s\n", *deletePendingAfter))
optsInfo.WriteString(fmt.Sprintf("\tdelete-orphaned-after: %s\n", *deleteOrphanedAfter))
optsInfo.WriteString(fmt.Sprintf("\tdelete-evicted-after: %s\n", *deleteEvictedAfter))
optsInfo.WriteString(fmt.Sprintf("\tignore-owned-by-cronjobs: %v\n", *ignoreOwnedByCronjob))

optsInfo.WriteString(fmt.Sprintf("\n\tlegacy-mode: %v\n", *legacyMode))
optsInfo.WriteString(fmt.Sprintf("\tkeep-successful: %d\n", *legacyKeepSuccessHours))
Expand Down Expand Up @@ -121,6 +128,7 @@ func main() {
*deletePendingAfter,
*deleteOrphanedAfter,
*deleteEvictedAfter,
*ignoreOwnedByCronjob,
stopCh,
).Run()
}
Expand Down
167 changes: 18 additions & 149 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,17 @@ type Kleaner struct {
deleteOrphanedAfter time.Duration
deleteEvictedAfter time.Duration

ignoreOwnedByCronjob bool

dryRun bool
ctx context.Context
stopCh <-chan struct{}
}

// NewKleaner creates a new NewKleaner
func NewKleaner(ctx context.Context, kclient *kubernetes.Clientset, namespace string, dryRun bool, deleteSuccessfulAfter,
deleteFailedAfter, deletePendingAfter, deleteOrphanedAfter, deleteEvictedAfter time.Duration, stopCh <-chan struct{}) *Kleaner {
deleteFailedAfter, deletePendingAfter, deleteOrphanedAfter, deleteEvictedAfter time.Duration, ignoreOwnedByCronjob bool,
stopCh <-chan struct{}) *Kleaner {
jobInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
Expand Down Expand Up @@ -95,6 +98,7 @@ func NewKleaner(ctx context.Context, kclient *kubernetes.Clientset, namespace st
deletePendingAfter: deletePendingAfter,
deleteOrphanedAfter: deleteOrphanedAfter,
deleteEvictedAfter: deleteEvictedAfter,
ignoreOwnedByCronjob: ignoreOwnedByCronjob,
}
jobInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
Expand Down Expand Up @@ -154,87 +158,32 @@ func (c *Kleaner) Process(obj interface{}) {
if !t.DeletionTimestamp.IsZero() {
return
}
job := t
// skip the job if it has any active pods
if job.Status.Active > 0 {
return
}

owners := getJobOwnerKinds(job)
if isOwnedByCronJob(owners) {
return
}

finishTime := extractJobFinishTime(job)

if finishTime.IsZero() {
return
}

timeSinceFinish := time.Since(finishTime)

if job.Status.Succeeded > 0 {
if c.deleteSuccessfulAfter > 0 && timeSinceFinish > c.deleteSuccessfulAfter {
c.deleteJobs(job)
}
}
if job.Status.Failed > 0 {
if c.deleteFailedAfter > 0 && timeSinceFinish >= c.deleteFailedAfter {
c.deleteJobs(job)
}
if shouldDeleteJob(t, c.deleteSuccessfulAfter, c.deleteFailedAfter, c.ignoreOwnedByCronjob) {
c.DeleteJob(t)
}

case *corev1.Pod:
// skip pods that are already in the deleting process
if !t.DeletionTimestamp.IsZero() {
return
}
pod := t
owners := getPodOwnerKinds(pod)
podFinishTime := extractPodFinishTime(pod)
if podFinishTime.IsZero() {
return
}
age := time.Since(podFinishTime)
// orphaned pod: those that do not have any owner references
// - uses c.deleteOrphanedAfter
if len(owners) == 0 {
if c.deleteOrphanedAfter > 0 && age >= c.deleteOrphanedAfter {
c.deletePods(pod)
}
// skip pods that are already in the deleting process
if !pod.DeletionTimestamp.IsZero() {
return
}
// owned by job, have exactly one ownerReference present and its kind is Job
// - uses the c.deleteSuccessfulAfter, c.deleteFailedAfter, c.deletePendingAfter
if isOwnedByJob(owners) {
jobOwnerName := pod.OwnerReferences[0].Name
jobOwner, exists, err := c.jobInformer.GetStore().GetByKey(pod.Namespace + "/" + jobOwnerName)
if err != nil {
log.Printf("Can't find job '%s:%s`", pod.Namespace, jobOwnerName)

} else if exists && isOwnedByCronJob(getJobOwnerKinds(jobOwner.(*batchv1.Job))) {
return
}
toDelete := c.maybeDeletePod(pod.Status.Phase, age)
if toDelete {
c.deletePods(pod)
}
// skip pods related to jobs created by cronjobs if `ignoreOwnedByCronjob` is set
if c.ignoreOwnedByCronjob && podRelatedToCronJob(pod, c.jobInformer.GetStore()) {
return
}
// evicted pods, those with or without owner references, but in Evicted state
// - uses c.deleteEvictedAfter
if pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == "Evicted" && c.deleteEvictedAfter > 0 && age >= c.deleteEvictedAfter {
c.deletePods(pod)
// normal cleanup flow
if shouldDeletePod(t, c.deleteOrphanedAfter, c.deletePendingAfter, c.deleteEvictedAfter, c.deleteSuccessfulAfter, c.deleteFailedAfter) {
c.DeletePod(t)
}
}
}

func (c *Kleaner) deleteJobs(job *batchv1.Job) {
func (c *Kleaner) DeleteJob(job *batchv1.Job) {
if c.dryRun {
log.Printf("dry-run: Job '%s:%s' would have been deleted", job.Namespace, job.Name)
return
}
log.Printf("Deleting job '%s:%s'", job.Namespace, job.Name)
log.Printf("Deleting job '%s/%s'", job.Namespace, job.Name)
propagation := metav1.DeletePropagationForeground
jo := metav1.DeleteOptions{PropagationPolicy: &propagation}
if err := c.kclient.BatchV1().Jobs(job.Namespace).Delete(c.ctx, job.Name, jo); ignoreNotFound(err) != nil {
Expand All @@ -245,12 +194,12 @@ func (c *Kleaner) deleteJobs(job *batchv1.Job) {
metrics.GetOrCreateCounter(metricName(jobDeletedMetric, job.Namespace)).Inc()
}

func (c *Kleaner) deletePods(pod *corev1.Pod) {
func (c *Kleaner) DeletePod(pod *corev1.Pod) {
if c.dryRun {
log.Printf("dry-run: Pod '%s:%s' would have been deleted", pod.Namespace, pod.Name)
return
}
log.Printf("Deleting pod '%s:%s'", pod.Namespace, pod.Name)
log.Printf("Deleting pod '%s/%s'", pod.Namespace, pod.Name)
var po metav1.DeleteOptions
if err := c.kclient.CoreV1().Pods(pod.Namespace).Delete(c.ctx, pod.Name, po); ignoreNotFound(err) != nil {
log.Printf("failed to delete pod '%s:%s': %v", pod.Namespace, pod.Name, err)
Expand All @@ -259,83 +208,3 @@ func (c *Kleaner) deletePods(pod *corev1.Pod) {
}
metrics.GetOrCreateCounter(metricName(podDeletedMetric, pod.Namespace)).Inc()
}

func (c *Kleaner) maybeDeletePod(podPhase corev1.PodPhase, timeSinceFinish time.Duration) bool {
switch podPhase {
case corev1.PodSucceeded:
if c.deleteSuccessfulAfter > 0 && timeSinceFinish >= c.deleteSuccessfulAfter {
return true
}
case corev1.PodFailed:
if c.deleteFailedAfter > 0 && timeSinceFinish >= c.deleteFailedAfter {
return true
}
case corev1.PodPending:
if c.deletePendingAfter > 0 && timeSinceFinish >= c.deletePendingAfter {
return true
}
default:
return false
}
return false
}

func getPodOwnerKinds(pod *corev1.Pod) []string {
var kinds []string
for _, ow := range pod.OwnerReferences {
kinds = append(kinds, ow.Kind)
}
return kinds
}

func getJobOwnerKinds(job *batchv1.Job) []string {
var kinds []string
for _, ow := range job.OwnerReferences {
kinds = append(kinds, ow.Kind)
}
return kinds
}

// isOwnedByJob returns true if and only if pod has a single owner
// and this owners kind is Job
func isOwnedByJob(ownerKinds []string) bool {
if len(ownerKinds) == 1 && ownerKinds[0] == "Job" {
return true
}
return false
}

// isOwnedByCronJob returns true if and only if job has a single owner CronJob
// and this owners kind is CronJob
func isOwnedByCronJob(ownerKinds []string) bool {
if len(ownerKinds) == 1 && ownerKinds[0] == "CronJob" {
return true
}
return false
}

func extractPodFinishTime(podObj *corev1.Pod) time.Time {
for _, pc := range podObj.Status.Conditions {
// Looking for the time when pod's condition "Ready" became "false" (equals end of execution)
if pc.Type == corev1.PodReady && pc.Status == corev1.ConditionFalse {
return pc.LastTransitionTime.Time
}
}
return time.Time{}
}

// Can return "zero" time, caller must check
func extractJobFinishTime(jobObj *batchv1.Job) time.Time {
if !jobObj.Status.CompletionTime.IsZero() {
return jobObj.Status.CompletionTime.Time
}

for _, jc := range jobObj.Status.Conditions {
// Looking for the time when job's condition "Failed" became "true" (equals end of execution)
if jc.Type == batchv1.JobFailed && jc.Status == corev1.ConditionTrue {
return jc.LastTransitionTime.Time
}
}

return time.Time{}
}
Loading

0 comments on commit 6436dfd

Please sign in to comment.