Skip to content

Commit

Permalink
(finalizers): refactor (#124)
Browse files Browse the repository at this point in the history
* (finalizers): refactor

* (finalizers): added tests

* (finalizers): convert finalizer logs to events
  • Loading branch information
itamar-marom authored Jan 14, 2024
1 parent a0beeb3 commit da563de
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 86 deletions.
2 changes: 1 addition & 1 deletion controllers/druid/druid_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (r *DruidReconciler) Reconcile(ctx context.Context, request reconcile.Reque
return ctrl.Result{}, err
}

// Intialize Emit Events
// Initialize Emit Events
var emitEvent EventEmitter = EmitEventFuncs{r.Recorder}

if err := deployDruidCluster(ctx, r.Client, instance, emitEvent); err != nil {
Expand Down
116 changes: 116 additions & 0 deletions controllers/druid/finalizers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package druid

import (
"context"
"fmt"

"github.com/datainfrahq/druid-operator/apis/druid/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
deletePVCFinalizerName = "deletepvc.finalizers.druid.apache.org"
)

var (
defaultFinalizers []string
)

func updateFinalizers(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error {
desiredFinalizers := m.GetFinalizers()
additionFinalizers := defaultFinalizers

desiredFinalizers = RemoveString(desiredFinalizers, deletePVCFinalizerName)
if !m.Spec.DisablePVCDeletionFinalizer {
additionFinalizers = append(additionFinalizers, deletePVCFinalizerName)
}

for _, finalizer := range additionFinalizers {
if !ContainsString(desiredFinalizers, finalizer) {
desiredFinalizers = append(desiredFinalizers, finalizer)
}
}

m.SetFinalizers(desiredFinalizers)
_, err := writers.Update(ctx, sdk, m, m, emitEvents)
if err != nil {
return err
}

return nil
}

func executeFinalizers(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error {
if m.Spec.DisablePVCDeletionFinalizer == false {
if err := executePVCFinalizer(ctx, sdk, m, emitEvents); err != nil {
return err
}
}
return nil
}

/*
executePVCFinalizer will execute a PVC deletion of all Druid's PVCs.
Flow:
1. Get sts List and PVC List
2. Range and Delete sts first and then delete pvc. PVC must be deleted after sts termination has been executed
else pvc finalizer shall block deletion since a pod/sts is referencing it.
3. Once delete is executed we block program and return.
*/
func executePVCFinalizer(ctx context.Context, sdk client.Client, druid *v1alpha1.Druid, eventEmitter EventEmitter) error {
if ContainsString(druid.ObjectMeta.Finalizers, deletePVCFinalizerName) {
pvcLabels := map[string]string{
"druid_cr": druid.Name,
}

pvcList, err := readers.List(ctx, sdk, druid, pvcLabels, eventEmitter, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PersistentVolumeClaimList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return err
}

stsList, err := readers.List(ctx, sdk, druid, makeLabelsForDruid(druid.Name), eventEmitter, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object {
items := listObj.(*appsv1.StatefulSetList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return err
}

eventEmitter.EmitEventGeneric(druid, string(druidFinalizerTriggered),
fmt.Sprintf("Trigerring finalizer [%s] for CR [%s] in namespace [%s]", deletePVCFinalizerName, druid.Name, druid.Namespace), nil)

if err = deleteSTSAndPVC(ctx, sdk, druid, stsList, pvcList, eventEmitter); err != nil {
eventEmitter.EmitEventGeneric(druid, string(druidFinalizerFailed),
fmt.Sprintf("Finalizer [%s] failed for CR [%s] in namespace [%s]", deletePVCFinalizerName, druid.Name, druid.Namespace), err)

return err
}

eventEmitter.EmitEventGeneric(druid, string(druidFinalizerSuccess),
fmt.Sprintf("Finalizer [%s] success for CR [%s] in namespace [%s]", deletePVCFinalizerName, druid.Name, druid.Namespace), nil)

// remove our finalizer from the list and update it.
druid.ObjectMeta.Finalizers = RemoveString(druid.ObjectMeta.Finalizers, deletePVCFinalizerName)

_, err = writers.Update(ctx, sdk, druid, druid, eventEmitter)
if err != nil {
return err
}

}
return nil
}
141 changes: 141 additions & 0 deletions controllers/druid/finalizers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package druid

import (
"time"

druidv1alpha1 "github.com/datainfrahq/druid-operator/apis/druid/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/types"
)

// +kubebuilder:docs-gen:collapse=Imports

/*
finalizers_test
*/
var _ = Describe("Test finalizers logic", func() {
const (
filePath = "testdata/finalizers.yaml"
timeout = time.Second * 45
interval = time.Millisecond * 250
)

var (
druid = &druidv1alpha1.Druid{}
)

Context("When creating a druid cluster", func() {
It("Should create the druid object", func() {
By("Creating a new druid")
druidCR, err := readDruidClusterSpecFromFile(filePath)
Expect(err).Should(BeNil())
Expect(k8sClient.Create(ctx, druidCR)).To(Succeed())

By("Getting a newly created druid")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druidCR.Name, Namespace: druidCR.Namespace}, druid)
return err == nil
}, timeout, interval).Should(BeTrue())
})
It("Should add the delete PVC finalizer", func() {
By("Waiting for the finalizer to be created")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
if err == nil && ContainsString(druid.GetFinalizers(), deletePVCFinalizerName) {
return true
}
return false
}, timeout, interval).Should(BeTrue())
})
It("Should delete druid successfully", func() {
By("Waiting for the druid cluster to be deleted")
Expect(k8sClient.Delete(ctx, druid)).To(Succeed())
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
return err != nil
}, timeout, interval).Should(BeTrue())
})
})

Context("When creating a druid cluster with disablePVCDeletion", func() {
It("Should create the druid object", func() {
By("Creating a new druid")
druidCR, err := readDruidClusterSpecFromFile(filePath)
druidCR.Spec.DisablePVCDeletionFinalizer = true
Expect(err).Should(BeNil())
Expect(k8sClient.Create(ctx, druidCR)).To(Succeed())

By("Getting a newly created druid")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druidCR.Name, Namespace: druidCR.Namespace}, druid)
return err == nil
}, timeout, interval).Should(BeTrue())
})
It("Should not add the delete PVC finalizer", func() {
By("Call for the update finalizer function")
Expect(updateFinalizers(ctx, k8sClient, druid, emitEvent)).Should(BeNil())

By("Getting a updated druid")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
return err == nil
}, timeout, interval).Should(BeTrue())

By("Checking the absence of the finalizer")
Expect(ContainsString(druid.GetFinalizers(), deletePVCFinalizerName)).Should(BeFalse())
})
It("Should delete druid successfully", func() {
Expect(k8sClient.Delete(ctx, druid)).To(Succeed())
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
return err != nil
}, timeout, interval).Should(BeTrue())
})
})

Context("When creating a druid cluster", func() {
It("Should create the druid object", func() {
By("Creating a new druid")
druidCR, err := readDruidClusterSpecFromFile(filePath)
Expect(err).Should(BeNil())
Expect(k8sClient.Create(ctx, druidCR)).To(Succeed())

By("Getting the CR")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druidCR.Name, Namespace: druidCR.Namespace}, druid)
return err == nil
}, timeout, interval).Should(BeTrue())
})
It("Should add the delete PVC finalizer", func() {
By("Waiting for the finalizer to be created")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
if err == nil && ContainsString(druid.GetFinalizers(), deletePVCFinalizerName) {
return true
}
return false
}, timeout, interval).Should(BeTrue())
})
It("Should remove the delete PVC finalizer", func() {
By("Disabling the deletePVC finalizer")
druid.Spec.DisablePVCDeletionFinalizer = true
Expect(k8sClient.Update(ctx, druid)).To(BeNil())
By("Waiting for the finalizer to be deleted")
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
if err == nil && !ContainsString(druid.GetFinalizers(), deletePVCFinalizerName) {
return true
}
return false
}, timeout, interval).Should(BeTrue())
})
It("Should delete druid successfully", func() {
Expect(k8sClient.Delete(ctx, druid)).To(Succeed())
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: druid.Name, Namespace: druid.Namespace}, druid)
return err != nil
}, timeout, interval).Should(BeTrue())
})
})
})
90 changes: 6 additions & 84 deletions controllers/druid/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,35 +80,12 @@ func deployDruidCluster(ctx context.Context, sdk client.Client, m *v1alpha1.Drui
return err
}

/*
Default Behavior: Finalizer shall be always executed resulting in deletion of pvc post deletion of Druid CR
When the object (druid CR) has for deletion time stamp set, execute the finalizer.
Finalizer shall execute the following flow :
1. Get sts List and PVC List
2. Range and Delete sts first and then delete pvc. PVC must be deleted after sts termination has been executed
else pvc finalizer shall block deletion since a pod/sts is referencing it.
3. Once delete is executed we block program and return.
*/

if m.Spec.DisablePVCDeletionFinalizer == false {
md := m.GetDeletionTimestamp() != nil
if md {
return executeFinalizers(ctx, sdk, m, emitEvents)
}
/*
If finalizer isn't present add it to object meta.
In case cr is already deleted do not call this function
*/
cr := checkIfCRExists(ctx, sdk, m, emitEvents)
if cr {
if !ContainsString(m.ObjectMeta.Finalizers, finalizerName) {
m.SetFinalizers(append(m.GetFinalizers(), finalizerName))
_, err := writers.Update(context.Background(), sdk, m, m, emitEvents)
if err != nil {
return err
}
}
}
if m.GetDeletionTimestamp() != nil {
return executeFinalizers(ctx, sdk, m, emitEvents)
}

if err := updateFinalizers(ctx, sdk, m, emitEvents); err != nil {
return err
}

for _, elem := range allNodeSpecs {
Expand Down Expand Up @@ -574,61 +551,6 @@ func setPVCLabels(ctx context.Context, sdk client.Client, drd *v1alpha1.Druid, e
return nil
}

func executeFinalizers(ctx context.Context, sdk client.Client, m *v1alpha1.Druid, emitEvents EventEmitter) error {

if ContainsString(m.ObjectMeta.Finalizers, finalizerName) {
pvcLabels := map[string]string{
"druid_cr": m.Name,
}

pvcList, err := readers.List(ctx, sdk, m, pvcLabels, emitEvents, func() objectList { return &v1.PersistentVolumeClaimList{} }, func(listObj runtime.Object) []object {
items := listObj.(*v1.PersistentVolumeClaimList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return err
}

stsList, err := readers.List(ctx, sdk, m, makeLabelsForDruid(m.Name), emitEvents, func() objectList { return &appsv1.StatefulSetList{} }, func(listObj runtime.Object) []object {
items := listObj.(*appsv1.StatefulSetList).Items
result := make([]object, len(items))
for i := 0; i < len(items); i++ {
result[i] = &items[i]
}
return result
})
if err != nil {
return err
}

msg := fmt.Sprintf("Trigerring finalizer for CR [%s] in namespace [%s]", m.Name, m.Namespace)
// sendEvent(sdk, m, v1.EventTypeNormal, DruidFinalizer, msg)
logger.Info(msg)
if err := deleteSTSAndPVC(ctx, sdk, m, stsList, pvcList, emitEvents); err != nil {
return err
} else {
msg := fmt.Sprintf("Finalizer success for CR [%s] in namespace [%s]", m.Name, m.Namespace)
// sendEvent(sdk, m, v1.EventTypeNormal, DruidFinalizerSuccess, msg)
logger.Info(msg)
}

// remove our finalizer from the list and update it.
m.ObjectMeta.Finalizers = RemoveString(m.ObjectMeta.Finalizers, finalizerName)

_, err = writers.Update(ctx, sdk, m, m, emitEvents)
if err != nil {
return err
}

}
return nil

}

func execCheckCrashStatus(ctx context.Context, sdk client.Client, nodeSpec *v1alpha1.DruidNodeSpec, m *v1alpha1.Druid, event EventEmitter) {
if m.Spec.ForceDeleteStsPodOnError == false {
return
Expand Down
4 changes: 4 additions & 0 deletions controllers/druid/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ const (
druidNodePatchFail druidEventReason = "DruidOperatorPatchFail"
druidNodePatchSucess druidEventReason = "DruidOperatorPatchSuccess"
druidObjectListFail druidEventReason = "DruidOperatorListFail"

druidFinalizerTriggered druidEventReason = "DruidOperatorFinalizerTriggered"
druidFinalizerFailed druidEventReason = "DruidFinalizerFailed"
druidFinalizerSuccess druidEventReason = "DruidFinalizerSuccess"
)

// Reader Interface
Expand Down
3 changes: 3 additions & 0 deletions controllers/druid/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
emitEvent EventEmitter
)

func TestAPIs(t *testing.T) {
Expand Down Expand Up @@ -93,6 +94,8 @@ var _ = BeforeSuite(func() {
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

emitEvent = EmitEventFuncs{k8sManager.GetEventRecorderFor("druid-operator")}

go func() {
defer GinkgoRecover()
err = k8sManager.Start(ctx)
Expand Down
Loading

0 comments on commit da563de

Please sign in to comment.