Skip to content

Commit

Permalink
feat: add reconcile jobs for deleted topologies and canaries
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmehrotra committed Jul 8, 2023
1 parent 57ecd2c commit 4b84b14
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/system_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (r *TopologyReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request)
if err := db.DeleteTopology(topology); err != nil {
logger.Error(err, "failed to delete topology")
}
systemJobs.DeleteTopologyJob(*topology)
systemJobs.DeleteTopologyJob(topology.GetPersistedID())
controllerutil.RemoveFinalizer(topology, TopologyFinalizerName)
return ctrl.Result{}, r.Update(ctx, topology)
}
Expand Down
49 changes: 42 additions & 7 deletions pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ type CanaryStatusPayload struct {
NamespacedName types.NamespacedName
}

func findCronEntry(canary v1.Canary) *cron.Entry {
func findCronEntry(id string) *cron.Entry {
for _, entry := range CanaryScheduler.Entries() {
if entry.Job.(CanaryJob).GetPersistedID() == canary.GetPersistedID() {
if entry.Job.(CanaryJob).GetPersistedID() == id {
return &entry
}
}
Expand Down Expand Up @@ -247,7 +247,7 @@ var canaryUpdateTimeCache = sync.Map{}
// TODO: Refactor to use database object instead of kubernetes
func SyncCanaryJob(canary v1.Canary) error {
if !canary.DeletionTimestamp.IsZero() || canary.Spec.GetSchedule() == "@never" {
DeleteCanaryJob(canary)
DeleteCanaryJob(canary.GetPersistedID())
return nil
}

Expand All @@ -273,7 +273,7 @@ func SyncCanaryJob(canary v1.Canary) error {
}

updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String())
entry := findCronEntry(canary)
entry := findCronEntry(canary.GetPersistedID())
if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil {
// Remove entry if it exists
if entry != nil {
Expand Down Expand Up @@ -351,15 +351,50 @@ func SyncCanaryJobs() {
logger.Infof("Synced canary jobs %d", len(CanaryScheduler.Entries()))
}

func DeleteCanaryJob(canary v1.Canary) {
entry := findCronEntry(canary)
func DeleteCanaryJob(id string) {
entry := findCronEntry(id)
if entry == nil {
return
}
logger.Tracef("deleting cron entry for canary %s/%s with entry ID: %v", canary.Name, canary.Namespace, entry.ID)
logger.Tracef("deleting cron entry for canary:%s with entry ID: %v", id, entry.ID)
CanaryScheduler.Remove(entry.ID)
}

func ReconcileDeletedCanaryChecks() {
jobHistory := models.NewJobHistory("ReconcileDeletedTopologyComponents", "", "").Start()
_ = db.PersistJobHistory(jobHistory)
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

var rows []struct {
ID string
DeletedAt time.Time
}
// Select all components whose topology ID is deleted but their deleted at is not marked
err := db.Gorm.Raw(`
SELECT DISTINCT(canaries.id), canaries.deleted_at
FROM canaries
INNER JOIN checks ON canaries.id = checks.canary_id
WHERE
checks.deleted_at IS NULL AND
canaries.deleted_at IS NOT NULL
`).Scan(&rows).Error

if err != nil {
logger.Errorf("Error fetching deleted canary checks: %v", err)
jobHistory.AddError(err.Error())
return
}

for _, r := range rows {
if err := db.DeleteChecksForCanary(r.ID, r.DeletedAt); err != nil {
logger.Errorf("Error deleting checks for canary[%s]: %v", r.ID, err)
jobHistory.AddError(err.Error())
}
DeleteCanaryJob(r.ID)
}
jobHistory.IncrSuccess()
}

func ScheduleFunc(schedule string, fn func()) (interface{}, error) {
return FuncScheduler.AddFunc(schedule, fn)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
CheckCleanupSchedule = "@every 12h"
CanaryCleanupSchedule = "@every 12h"
PrometheusGaugeCleanupSchedule = "@every 1h"

ReconcileDeletedTopologyComponentsSchedule = "@every 1h"
ReconcileDeletedCanaryChecksSchedule = "@every 1h"
)

func Start() {
Expand Down Expand Up @@ -79,6 +82,12 @@ func Start() {
if _, err := ScheduleFunc(CanaryCleanupSchedule, db.CleanupCanaries); err != nil {
logger.Errorf("Failed to schedule canary cleanup job: %v", err)
}
if _, err := ScheduleFunc(ReconcileDeletedTopologyComponentsSchedule, systemJobs.ReconcileDeletedTopologyComponents); err != nil {
logger.Errorf("Failed to schedule ReconcileDeletedTopologyComponents: %v", err)
}
if _, err := ScheduleFunc(ReconcileDeletedCanaryChecksSchedule, canaryJobs.ReconcileDeletedCanaryChecks); err != nil {
logger.Errorf("Failed to schedule ReconcileDeletedCanaryChecks: %v", err)
}

canaryJobs.CleanupMetricsGauges()
canaryJobs.SyncCanaryJobs()
Expand Down
52 changes: 44 additions & 8 deletions pkg/jobs/system/system_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flanksource/canary-checker/pkg/db"
pkgTopology "github.com/flanksource/canary-checker/pkg/topology"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/models"
"github.com/flanksource/kommons"
"github.com/robfig/cron/v3"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -71,7 +72,7 @@ func SyncTopologyJobs() {

func SyncTopologyJob(t v1.Topology) error {
if !t.DeletionTimestamp.IsZero() || t.Spec.GetSchedule() == "@never" {
DeleteTopologyJob(t)
DeleteTopologyJob(t.GetPersistedID())
return nil
}
if Kommons == nil {
Expand All @@ -81,7 +82,7 @@ func SyncTopologyJob(t v1.Topology) error {
logger.Warnf("Failed to get kommons client, features that read kubernetes config will fail: %v", err)
}
}
entry := findTopologyCronEntry(t)
entry := findTopologyCronEntry(t.GetPersistedID())
if entry != nil {
job := entry.Job.(TopologyJob)
if !reflect.DeepEqual(job.Topology.Spec, t.Spec) {
Expand All @@ -104,7 +105,7 @@ func SyncTopologyJob(t v1.Topology) error {
logger.Infof("Scheduled %s/%s: %s", t.Namespace, t.Name, t.Spec.GetSchedule())
}

entry = findTopologyCronEntry(t)
entry = findTopologyCronEntry(t.GetPersistedID())
if entry != nil && time.Until(entry.Next) < 1*time.Hour {
// run all regular topologies on startup
job = entry.Job.(TopologyJob)
Expand All @@ -113,20 +114,55 @@ func SyncTopologyJob(t v1.Topology) error {
return nil
}

func findTopologyCronEntry(t v1.Topology) *cron.Entry {
func findTopologyCronEntry(id string) *cron.Entry {
for _, entry := range TopologyScheduler.Entries() {
if entry.Job.(TopologyJob).GetPersistedID() == t.GetPersistedID() {
if entry.Job.(TopologyJob).GetPersistedID() == id {
return &entry
}
}
return nil
}

func DeleteTopologyJob(t v1.Topology) {
entry := findTopologyCronEntry(t)
func DeleteTopologyJob(id string) {
entry := findTopologyCronEntry(id)
if entry == nil {
return
}
logger.Tracef("deleting cron entry for topology %s/%s with entry ID: %v", t.Name, t.Namespace, entry.ID)
logger.Tracef("deleting cron entry for topology:%s with entry ID: %v", id, entry.ID)
TopologyScheduler.Remove(entry.ID)
}

func ReconcileDeletedTopologyComponents() {
jobHistory := models.NewJobHistory("ReconcileDeletedTopologyComponents", "", "").Start()
_ = db.PersistJobHistory(jobHistory)
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

var rows []struct {
ID string
DeletedAt time.Time
}
// Select all components whose topology ID is deleted but their deleted at is not marked
err := db.Gorm.Raw(`
SELECT DISTINCT(topologies.id), topologies.deleted_at as deleted_at
FROM topologies
INNER JOIN components ON topologies.id = components.topology_id
WHERE
components.deleted_at IS NULL AND
topologies.deleted_at IS NOT NULL
`).Scan(&rows).Error

if err != nil {
logger.Errorf("Error fetching deleted topology components: %v", err)
jobHistory.AddError(err.Error())
return
}

for _, r := range rows {
if err := db.DeleteComponentsOfTopology(r.ID, r.DeletedAt); err != nil {
logger.Errorf("Error deleting components for topology[%s]: %v", r.ID, err)
jobHistory.AddError(err.Error())
}
DeleteTopologyJob(r.ID)
}
jobHistory.IncrSuccess()
}

0 comments on commit 4b84b14

Please sign in to comment.