Skip to content

Commit 189f346

Browse files
authored
Merge pull request #1132 from flanksource/reconcile-delete-jobs
feat: add reconcile jobs for deleted topologies and canaries
2 parents 0249936 + 877aa98 commit 189f346

File tree

5 files changed

+97
-17
lines changed

5 files changed

+97
-17
lines changed

pkg/controllers/canary_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c
9393
if err := db.DeleteCanary(*canary); err != nil {
9494
logger.Error(err, "failed to delete canary")
9595
}
96-
canaryJobs.DeleteCanaryJob(*canary)
96+
canaryJobs.DeleteCanaryJob(canary.GetPersistedID())
9797
controllerutil.RemoveFinalizer(canary, FinalizerName)
9898
return ctrl.Result{}, r.Update(ctx, canary)
9999
}

pkg/controllers/system_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (r *TopologyReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request)
6363
if err := db.DeleteTopology(topology); err != nil {
6464
logger.Error(err, "failed to delete topology")
6565
}
66-
systemJobs.DeleteTopologyJob(*topology)
66+
systemJobs.DeleteTopologyJob(topology.GetPersistedID())
6767
controllerutil.RemoveFinalizer(topology, TopologyFinalizerName)
6868
return ctrl.Result{}, r.Update(ctx, topology)
6969
}

pkg/jobs/canary/canary_jobs.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,9 @@ type CanaryStatusPayload struct {
214214
NamespacedName types.NamespacedName
215215
}
216216

217-
func findCronEntry(canary v1.Canary) *cron.Entry {
217+
func findCronEntry(id string) *cron.Entry {
218218
for _, entry := range CanaryScheduler.Entries() {
219-
if entry.Job.(CanaryJob).GetPersistedID() == canary.GetPersistedID() {
219+
if entry.Job.(CanaryJob).GetPersistedID() == id {
220220
return &entry
221221
}
222222
}
@@ -247,7 +247,7 @@ var canaryUpdateTimeCache = sync.Map{}
247247
// TODO: Refactor to use database object instead of kubernetes
248248
func SyncCanaryJob(canary v1.Canary) error {
249249
if !canary.DeletionTimestamp.IsZero() || canary.Spec.GetSchedule() == "@never" {
250-
DeleteCanaryJob(canary)
250+
DeleteCanaryJob(canary.GetPersistedID())
251251
return nil
252252
}
253253

@@ -273,7 +273,7 @@ func SyncCanaryJob(canary v1.Canary) error {
273273
}
274274

275275
updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String())
276-
entry := findCronEntry(canary)
276+
entry := findCronEntry(canary.GetPersistedID())
277277
if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil {
278278
// Remove entry if it exists
279279
if entry != nil {
@@ -351,15 +351,50 @@ func SyncCanaryJobs() {
351351
logger.Infof("Synced canary jobs %d", len(CanaryScheduler.Entries()))
352352
}
353353

354-
func DeleteCanaryJob(canary v1.Canary) {
355-
entry := findCronEntry(canary)
354+
func DeleteCanaryJob(id string) {
355+
entry := findCronEntry(id)
356356
if entry == nil {
357357
return
358358
}
359-
logger.Tracef("deleting cron entry for canary %s/%s with entry ID: %v", canary.Name, canary.Namespace, entry.ID)
359+
logger.Tracef("deleting cron entry for canary:%s with entry ID: %v", id, entry.ID)
360360
CanaryScheduler.Remove(entry.ID)
361361
}
362362

363+
func ReconcileDeletedCanaryChecks() {
364+
jobHistory := models.NewJobHistory("ReconcileDeletedTopologyComponents", "", "").Start()
365+
_ = db.PersistJobHistory(jobHistory)
366+
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()
367+
368+
var rows []struct {
369+
ID string
370+
DeletedAt time.Time
371+
}
372+
// Select all components whose topology ID is deleted but their deleted at is not marked
373+
err := db.Gorm.Raw(`
374+
SELECT DISTINCT(canaries.id), canaries.deleted_at
375+
FROM canaries
376+
INNER JOIN checks ON canaries.id = checks.canary_id
377+
WHERE
378+
checks.deleted_at IS NULL AND
379+
canaries.deleted_at IS NOT NULL
380+
`).Scan(&rows).Error
381+
382+
if err != nil {
383+
logger.Errorf("Error fetching deleted canary checks: %v", err)
384+
jobHistory.AddError(err.Error())
385+
return
386+
}
387+
388+
for _, r := range rows {
389+
if err := db.DeleteChecksForCanary(r.ID, r.DeletedAt); err != nil {
390+
logger.Errorf("Error deleting checks for canary[%s]: %v", r.ID, err)
391+
jobHistory.AddError(err.Error())
392+
}
393+
DeleteCanaryJob(r.ID)
394+
}
395+
jobHistory.IncrSuccess()
396+
}
397+
363398
func ScheduleFunc(schedule string, fn func()) (interface{}, error) {
364399
return FuncScheduler.AddFunc(schedule, fn)
365400
}

pkg/jobs/jobs.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ const (
2828
CheckCleanupSchedule = "@every 12h"
2929
CanaryCleanupSchedule = "@every 12h"
3030
PrometheusGaugeCleanupSchedule = "@every 1h"
31+
32+
ReconcileDeletedTopologyComponentsSchedule = "@every 1h"
33+
ReconcileDeletedCanaryChecksSchedule = "@every 1h"
3134
)
3235

3336
func Start() {
@@ -79,6 +82,12 @@ func Start() {
7982
if _, err := ScheduleFunc(CanaryCleanupSchedule, db.CleanupCanaries); err != nil {
8083
logger.Errorf("Failed to schedule canary cleanup job: %v", err)
8184
}
85+
if _, err := ScheduleFunc(ReconcileDeletedTopologyComponentsSchedule, systemJobs.ReconcileDeletedTopologyComponents); err != nil {
86+
logger.Errorf("Failed to schedule ReconcileDeletedTopologyComponents: %v", err)
87+
}
88+
if _, err := ScheduleFunc(ReconcileDeletedCanaryChecksSchedule, canaryJobs.ReconcileDeletedCanaryChecks); err != nil {
89+
logger.Errorf("Failed to schedule ReconcileDeletedCanaryChecks: %v", err)
90+
}
8291

8392
canaryJobs.CleanupMetricsGauges()
8493
canaryJobs.SyncCanaryJobs()

pkg/jobs/system/system_jobs.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/flanksource/canary-checker/pkg/db"
1212
pkgTopology "github.com/flanksource/canary-checker/pkg/topology"
1313
"github.com/flanksource/commons/logger"
14+
"github.com/flanksource/duty/models"
1415
"github.com/flanksource/kommons"
1516
"github.com/robfig/cron/v3"
1617
"k8s.io/apimachinery/pkg/types"
@@ -71,7 +72,7 @@ func SyncTopologyJobs() {
7172

7273
func SyncTopologyJob(t v1.Topology) error {
7374
if !t.DeletionTimestamp.IsZero() || t.Spec.GetSchedule() == "@never" {
74-
DeleteTopologyJob(t)
75+
DeleteTopologyJob(t.GetPersistedID())
7576
return nil
7677
}
7778
if Kommons == nil {
@@ -81,7 +82,7 @@ func SyncTopologyJob(t v1.Topology) error {
8182
logger.Warnf("Failed to get kommons client, features that read kubernetes config will fail: %v", err)
8283
}
8384
}
84-
entry := findTopologyCronEntry(t)
85+
entry := findTopologyCronEntry(t.GetPersistedID())
8586
if entry != nil {
8687
job := entry.Job.(TopologyJob)
8788
if !reflect.DeepEqual(job.Topology.Spec, t.Spec) {
@@ -104,7 +105,7 @@ func SyncTopologyJob(t v1.Topology) error {
104105
logger.Infof("Scheduled %s/%s: %s", t.Namespace, t.Name, t.Spec.GetSchedule())
105106
}
106107

107-
entry = findTopologyCronEntry(t)
108+
entry = findTopologyCronEntry(t.GetPersistedID())
108109
if entry != nil && time.Until(entry.Next) < 1*time.Hour {
109110
// run all regular topologies on startup
110111
job = entry.Job.(TopologyJob)
@@ -113,20 +114,55 @@ func SyncTopologyJob(t v1.Topology) error {
113114
return nil
114115
}
115116

116-
func findTopologyCronEntry(t v1.Topology) *cron.Entry {
117+
func findTopologyCronEntry(id string) *cron.Entry {
117118
for _, entry := range TopologyScheduler.Entries() {
118-
if entry.Job.(TopologyJob).GetPersistedID() == t.GetPersistedID() {
119+
if entry.Job.(TopologyJob).GetPersistedID() == id {
119120
return &entry
120121
}
121122
}
122123
return nil
123124
}
124125

125-
func DeleteTopologyJob(t v1.Topology) {
126-
entry := findTopologyCronEntry(t)
126+
func DeleteTopologyJob(id string) {
127+
entry := findTopologyCronEntry(id)
127128
if entry == nil {
128129
return
129130
}
130-
logger.Tracef("deleting cron entry for topology %s/%s with entry ID: %v", t.Name, t.Namespace, entry.ID)
131+
logger.Tracef("deleting cron entry for topology:%s with entry ID: %v", id, entry.ID)
131132
TopologyScheduler.Remove(entry.ID)
132133
}
134+
135+
func ReconcileDeletedTopologyComponents() {
136+
jobHistory := models.NewJobHistory("ReconcileDeletedTopologyComponents", "", "").Start()
137+
_ = db.PersistJobHistory(jobHistory)
138+
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()
139+
140+
var rows []struct {
141+
ID string
142+
DeletedAt time.Time
143+
}
144+
// Select all components whose topology ID is deleted but their deleted at is not marked
145+
err := db.Gorm.Raw(`
146+
SELECT DISTINCT(topologies.id), topologies.deleted_at as deleted_at
147+
FROM topologies
148+
INNER JOIN components ON topologies.id = components.topology_id
149+
WHERE
150+
components.deleted_at IS NULL AND
151+
topologies.deleted_at IS NOT NULL
152+
`).Scan(&rows).Error
153+
154+
if err != nil {
155+
logger.Errorf("Error fetching deleted topology components: %v", err)
156+
jobHistory.AddError(err.Error())
157+
return
158+
}
159+
160+
for _, r := range rows {
161+
if err := db.DeleteComponentsOfTopology(r.ID, r.DeletedAt); err != nil {
162+
logger.Errorf("Error deleting components for topology[%s]: %v", r.ID, err)
163+
jobHistory.AddError(err.Error())
164+
}
165+
DeleteTopologyJob(r.ID)
166+
}
167+
jobHistory.IncrSuccess()
168+
}

0 commit comments

Comments
 (0)