From 1e41b4f2b924691f3fa609ee0a5fbe139e49cfd1 Mon Sep 17 00:00:00 2001 From: Yash Mehrotra Date: Sat, 8 Jul 2023 10:16:22 +0530 Subject: [PATCH] feat: add reconcile jobs for deleted topologies and canaries --- pkg/controllers/canary_controller.go | 2 +- pkg/controllers/system_controller.go | 2 +- pkg/jobs/canary/canary_jobs.go | 49 ++++++++++++++++++++++---- pkg/jobs/jobs.go | 9 +++++ pkg/jobs/system/system_jobs.go | 52 +++++++++++++++++++++++----- 5 files changed, 97 insertions(+), 17 deletions(-) diff --git a/pkg/controllers/canary_controller.go b/pkg/controllers/canary_controller.go index dc9e42c93..219570518 100644 --- a/pkg/controllers/canary_controller.go +++ b/pkg/controllers/canary_controller.go @@ -93,7 +93,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c if err := db.DeleteCanary(*canary); err != nil { logger.Error(err, "failed to delete canary") } - canaryJobs.DeleteCanaryJob(*canary) + canaryJobs.DeleteCanaryJob(canary.GetPersistedID()) controllerutil.RemoveFinalizer(canary, FinalizerName) return ctrl.Result{}, r.Update(ctx, canary) } diff --git a/pkg/controllers/system_controller.go b/pkg/controllers/system_controller.go index 7cab2d8ec..2100f57c7 100644 --- a/pkg/controllers/system_controller.go +++ b/pkg/controllers/system_controller.go @@ -63,7 +63,7 @@ func (r *TopologyReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) if err := db.DeleteTopology(topology); err != nil { logger.Error(err, "failed to delete topology") } - systemJobs.DeleteTopologyJob(*topology) + systemJobs.DeleteTopologyJob(topology.GetPersistedID()) controllerutil.RemoveFinalizer(topology, TopologyFinalizerName) return ctrl.Result{}, r.Update(ctx, topology) } diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 669fd235b..cdd7b39e9 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -214,9 +214,9 @@ type CanaryStatusPayload struct { NamespacedName types.NamespacedName } -func findCronEntry(canary v1.Canary) *cron.Entry { +func findCronEntry(id string) *cron.Entry { for _, entry := range CanaryScheduler.Entries() { - if entry.Job.(CanaryJob).GetPersistedID() == canary.GetPersistedID() { + if entry.Job.(CanaryJob).GetPersistedID() == id { return &entry } } @@ -247,7 +247,7 @@ var canaryUpdateTimeCache = sync.Map{} // TODO: Refactor to use database object instead of kubernetes func SyncCanaryJob(canary v1.Canary) error { if !canary.DeletionTimestamp.IsZero() || canary.Spec.GetSchedule() == "@never" { - DeleteCanaryJob(canary) + DeleteCanaryJob(canary.GetPersistedID()) return nil } @@ -273,7 +273,7 @@ func SyncCanaryJob(canary v1.Canary) error { } updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String()) - entry := findCronEntry(canary) + entry := findCronEntry(canary.GetPersistedID()) if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil { // Remove entry if it exists if entry != nil { @@ -351,15 +351,50 @@ func SyncCanaryJobs() { logger.Infof("Synced canary jobs %d", len(CanaryScheduler.Entries())) } -func DeleteCanaryJob(canary v1.Canary) { - entry := findCronEntry(canary) +func DeleteCanaryJob(id string) { + entry := findCronEntry(id) if entry == nil { return } - logger.Tracef("deleting cron entry for canary %s/%s with entry ID: %v", canary.Name, canary.Namespace, entry.ID) + logger.Tracef("deleting cron entry for canary:%s with entry ID: %v", id, entry.ID) CanaryScheduler.Remove(entry.ID) } +func ReconcileDeletedCanaryChecks() { + jobHistory := models.NewJobHistory("ReconcileDeletedTopologyComponents", "", "").Start() + _ = db.PersistJobHistory(jobHistory) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + var rows []struct { + ID string + DeletedAt time.Time + } + // Select all components whose topology ID is deleted but their deleted at is not marked + err := db.Gorm.Raw(` + SELECT DISTINCT(canaries.id), canaries.deleted_at + FROM canaries + INNER JOIN checks ON canaries.id = checks.canary_id + WHERE + checks.deleted_at IS NULL AND + canaries.deleted_at IS NOT NULL + `).Scan(&rows).Error + + if err != nil { + logger.Errorf("Error fetching deleted canary checks: %v", err) + jobHistory.AddError(err.Error()) + return + } + + for _, r := range rows { + if err := db.DeleteChecksForCanary(r.ID, r.DeletedAt); err != nil { + logger.Errorf("Error deleting checks for canary[%s]: %v", r.ID, err) + jobHistory.AddError(err.Error()) + } + DeleteCanaryJob(r.ID) + } + jobHistory.IncrSuccess() +} + func ScheduleFunc(schedule string, fn func()) (interface{}, error) { return FuncScheduler.AddFunc(schedule, fn) } diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index d9d49a0f9..32f87ab9e 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -28,6 +28,9 @@ const ( CheckCleanupSchedule = "@every 12h" CanaryCleanupSchedule = "@every 12h" PrometheusGaugeCleanupSchedule = "@every 1h" + + ReconcileDeletedTopologyComponentsSchedule = "@every 1h" + ReconcileDeletedCanaryChecksSchedule = "@every 1h" ) func Start() { @@ -79,6 +82,12 @@ func Start() { if _, err := ScheduleFunc(CanaryCleanupSchedule, db.CleanupCanaries); err != nil { logger.Errorf("Failed to schedule canary cleanup job: %v", err) } + if _, err := ScheduleFunc(ReconcileDeletedTopologyComponentsSchedule, systemJobs.ReconcileDeletedTopologyComponents); err != nil { + logger.Errorf("Failed to schedule ReconcileDeletedTopologyComponents: %v", err) + } + if _, err := ScheduleFunc(ReconcileDeletedCanaryChecksSchedule, canaryJobs.ReconcileDeletedCanaryChecks); err != nil { + logger.Errorf("Failed to schedule ReconcileDeletedCanaryChecks: %v", err) + } canaryJobs.CleanupMetricsGauges() canaryJobs.SyncCanaryJobs() diff --git a/pkg/jobs/system/system_jobs.go b/pkg/jobs/system/system_jobs.go index 3487bd885..934a669e5 100644 --- a/pkg/jobs/system/system_jobs.go +++ b/pkg/jobs/system/system_jobs.go @@ -11,6 +11,7 @@ import ( "github.com/flanksource/canary-checker/pkg/db" pkgTopology "github.com/flanksource/canary-checker/pkg/topology" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/models" "github.com/flanksource/kommons" "github.com/robfig/cron/v3" "k8s.io/apimachinery/pkg/types" @@ -71,7 +72,7 @@ func SyncTopologyJobs() { func SyncTopologyJob(t v1.Topology) error { if !t.DeletionTimestamp.IsZero() || t.Spec.GetSchedule() == "@never" { - DeleteTopologyJob(t) + DeleteTopologyJob(t.GetPersistedID()) return nil } if Kommons == nil { @@ -81,7 +82,7 @@ func SyncTopologyJob(t v1.Topology) error { logger.Warnf("Failed to get kommons client, features that read kubernetes config will fail: %v", err) } } - entry := findTopologyCronEntry(t) + entry := findTopologyCronEntry(t.GetPersistedID()) if entry != nil { job := entry.Job.(TopologyJob) if !reflect.DeepEqual(job.Topology.Spec, t.Spec) { @@ -104,7 +105,7 @@ func SyncTopologyJob(t v1.Topology) error { logger.Infof("Scheduled %s/%s: %s", t.Namespace, t.Name, t.Spec.GetSchedule()) } - entry = findTopologyCronEntry(t) + entry = findTopologyCronEntry(t.GetPersistedID()) if entry != nil && time.Until(entry.Next) < 1*time.Hour { // run all regular topologies on startup job = entry.Job.(TopologyJob) @@ -113,20 +114,55 @@ func SyncTopologyJob(t v1.Topology) error { return nil } -func findTopologyCronEntry(t v1.Topology) *cron.Entry { +func findTopologyCronEntry(id string) *cron.Entry { for _, entry := range TopologyScheduler.Entries() { - if entry.Job.(TopologyJob).GetPersistedID() == t.GetPersistedID() { + if entry.Job.(TopologyJob).GetPersistedID() == id { return &entry } } return nil } -func DeleteTopologyJob(t v1.Topology) { - entry := findTopologyCronEntry(t) +func DeleteTopologyJob(id string) { + entry := findTopologyCronEntry(id) if entry == nil { return } - logger.Tracef("deleting cron entry for topology %s/%s with entry ID: %v", t.Name, t.Namespace, entry.ID) + logger.Tracef("deleting cron entry for topology:%s with entry ID: %v", id, entry.ID) TopologyScheduler.Remove(entry.ID) } + +func ReconcileDeletedTopologyComponents() { + jobHistory := models.NewJobHistory("ReconcileDeletedTopologyComponents", "", "").Start() + _ = db.PersistJobHistory(jobHistory) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + var rows []struct { + ID string + DeletedAt time.Time + } + // Select all components whose topology ID is deleted but their deleted at is not marked + err := db.Gorm.Raw(` + SELECT DISTINCT(topologies.id), topologies.deleted_at as deleted_at + FROM topologies + INNER JOIN components ON topologies.id = components.topology_id + WHERE + components.deleted_at IS NULL AND + topologies.deleted_at IS NOT NULL + `).Scan(&rows).Error + + if err != nil { + logger.Errorf("Error fetching deleted topology components: %v", err) + jobHistory.AddError(err.Error()) + return + } + + for _, r := range rows { + if err := db.DeleteComponentsOfTopology(r.ID, r.DeletedAt); err != nil { + logger.Errorf("Error deleting components for topology[%s]: %v", r.ID, err) + jobHistory.AddError(err.Error()) + } + DeleteTopologyJob(r.ID) + } + jobHistory.IncrSuccess() +}