Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add reconcile jobs for deleted topologies and canaries #1132

Merged
merged 2 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c
if err := db.DeleteCanary(*canary); err != nil {
logger.Error(err, "failed to delete canary")
}
canaryJobs.DeleteCanaryJob(*canary)
canaryJobs.DeleteCanaryJob(canary.GetPersistedID())
controllerutil.RemoveFinalizer(canary, FinalizerName)
return ctrl.Result{}, r.Update(ctx, canary)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/system_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (r *TopologyReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request)
if err := db.DeleteTopology(topology); err != nil {
logger.Error(err, "failed to delete topology")
}
systemJobs.DeleteTopologyJob(*topology)
systemJobs.DeleteTopologyJob(topology.GetPersistedID())
controllerutil.RemoveFinalizer(topology, TopologyFinalizerName)
return ctrl.Result{}, r.Update(ctx, topology)
}
Expand Down
49 changes: 42 additions & 7 deletions pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ type CanaryStatusPayload struct {
NamespacedName types.NamespacedName
}

func findCronEntry(canary v1.Canary) *cron.Entry {
func findCronEntry(id string) *cron.Entry {
for _, entry := range CanaryScheduler.Entries() {
if entry.Job.(CanaryJob).GetPersistedID() == canary.GetPersistedID() {
if entry.Job.(CanaryJob).GetPersistedID() == id {
return &entry
}
}
Expand Down Expand Up @@ -247,7 +247,7 @@ var canaryUpdateTimeCache = sync.Map{}
// TODO: Refactor to use database object instead of kubernetes
func SyncCanaryJob(canary v1.Canary) error {
if !canary.DeletionTimestamp.IsZero() || canary.Spec.GetSchedule() == "@never" {
DeleteCanaryJob(canary)
DeleteCanaryJob(canary.GetPersistedID())
return nil
}

Expand All @@ -273,7 +273,7 @@ func SyncCanaryJob(canary v1.Canary) error {
}

updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String())
entry := findCronEntry(canary)
entry := findCronEntry(canary.GetPersistedID())
if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil {
// Remove entry if it exists
if entry != nil {
Expand Down Expand Up @@ -351,15 +351,50 @@ func SyncCanaryJobs() {
logger.Infof("Synced canary jobs %d", len(CanaryScheduler.Entries()))
}

func DeleteCanaryJob(canary v1.Canary) {
entry := findCronEntry(canary)
func DeleteCanaryJob(id string) {
entry := findCronEntry(id)
if entry == nil {
return
}
logger.Tracef("deleting cron entry for canary %s/%s with entry ID: %v", canary.Name, canary.Namespace, entry.ID)
logger.Tracef("deleting cron entry for canary:%s with entry ID: %v", id, entry.ID)
CanaryScheduler.Remove(entry.ID)
}

func ReconcileDeletedCanaryChecks() {
jobHistory := models.NewJobHistory("ReconcileDeletedTopologyComponents", "", "").Start()
_ = db.PersistJobHistory(jobHistory)
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

var rows []struct {
ID string
DeletedAt time.Time
}
// Select all components whose topology ID is deleted but their deleted at is not marked
err := db.Gorm.Raw(`
SELECT DISTINCT(canaries.id), canaries.deleted_at
FROM canaries
INNER JOIN checks ON canaries.id = checks.canary_id
WHERE
checks.deleted_at IS NULL AND
canaries.deleted_at IS NOT NULL
`).Scan(&rows).Error

if err != nil {
logger.Errorf("Error fetching deleted canary checks: %v", err)
jobHistory.AddError(err.Error())
return
}

for _, r := range rows {
if err := db.DeleteChecksForCanary(r.ID, r.DeletedAt); err != nil {
logger.Errorf("Error deleting checks for canary[%s]: %v", r.ID, err)
jobHistory.AddError(err.Error())
}
DeleteCanaryJob(r.ID)
}
jobHistory.IncrSuccess()
}

func ScheduleFunc(schedule string, fn func()) (interface{}, error) {
return FuncScheduler.AddFunc(schedule, fn)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
CheckCleanupSchedule = "@every 12h"
CanaryCleanupSchedule = "@every 12h"
PrometheusGaugeCleanupSchedule = "@every 1h"

ReconcileDeletedTopologyComponentsSchedule = "@every 1h"
ReconcileDeletedCanaryChecksSchedule = "@every 1h"
)

func Start() {
Expand Down Expand Up @@ -79,6 +82,12 @@ func Start() {
if _, err := ScheduleFunc(CanaryCleanupSchedule, db.CleanupCanaries); err != nil {
logger.Errorf("Failed to schedule canary cleanup job: %v", err)
}
if _, err := ScheduleFunc(ReconcileDeletedTopologyComponentsSchedule, systemJobs.ReconcileDeletedTopologyComponents); err != nil {
logger.Errorf("Failed to schedule ReconcileDeletedTopologyComponents: %v", err)
}
if _, err := ScheduleFunc(ReconcileDeletedCanaryChecksSchedule, canaryJobs.ReconcileDeletedCanaryChecks); err != nil {
logger.Errorf("Failed to schedule ReconcileDeletedCanaryChecks: %v", err)
}

canaryJobs.CleanupMetricsGauges()
canaryJobs.SyncCanaryJobs()
Expand Down
52 changes: 44 additions & 8 deletions pkg/jobs/system/system_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flanksource/canary-checker/pkg/db"
pkgTopology "github.com/flanksource/canary-checker/pkg/topology"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/models"
"github.com/flanksource/kommons"
"github.com/robfig/cron/v3"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -71,7 +72,7 @@ func SyncTopologyJobs() {

func SyncTopologyJob(t v1.Topology) error {
if !t.DeletionTimestamp.IsZero() || t.Spec.GetSchedule() == "@never" {
DeleteTopologyJob(t)
DeleteTopologyJob(t.GetPersistedID())
return nil
}
if Kommons == nil {
Expand All @@ -81,7 +82,7 @@ func SyncTopologyJob(t v1.Topology) error {
logger.Warnf("Failed to get kommons client, features that read kubernetes config will fail: %v", err)
}
}
entry := findTopologyCronEntry(t)
entry := findTopologyCronEntry(t.GetPersistedID())
if entry != nil {
job := entry.Job.(TopologyJob)
if !reflect.DeepEqual(job.Topology.Spec, t.Spec) {
Expand All @@ -104,7 +105,7 @@ func SyncTopologyJob(t v1.Topology) error {
logger.Infof("Scheduled %s/%s: %s", t.Namespace, t.Name, t.Spec.GetSchedule())
}

entry = findTopologyCronEntry(t)
entry = findTopologyCronEntry(t.GetPersistedID())
if entry != nil && time.Until(entry.Next) < 1*time.Hour {
// run all regular topologies on startup
job = entry.Job.(TopologyJob)
Expand All @@ -113,20 +114,55 @@ func SyncTopologyJob(t v1.Topology) error {
return nil
}

func findTopologyCronEntry(t v1.Topology) *cron.Entry {
func findTopologyCronEntry(id string) *cron.Entry {
for _, entry := range TopologyScheduler.Entries() {
if entry.Job.(TopologyJob).GetPersistedID() == t.GetPersistedID() {
if entry.Job.(TopologyJob).GetPersistedID() == id {
return &entry
}
}
return nil
}

func DeleteTopologyJob(t v1.Topology) {
entry := findTopologyCronEntry(t)
func DeleteTopologyJob(id string) {
entry := findTopologyCronEntry(id)
if entry == nil {
return
}
logger.Tracef("deleting cron entry for topology %s/%s with entry ID: %v", t.Name, t.Namespace, entry.ID)
logger.Tracef("deleting cron entry for topology:%s with entry ID: %v", id, entry.ID)
TopologyScheduler.Remove(entry.ID)
}

func ReconcileDeletedTopologyComponents() {
jobHistory := models.NewJobHistory("ReconcileDeletedTopologyComponents", "", "").Start()
_ = db.PersistJobHistory(jobHistory)
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

var rows []struct {
ID string
DeletedAt time.Time
}
// Select all components whose topology ID is deleted but their deleted at is not marked
err := db.Gorm.Raw(`
SELECT DISTINCT(topologies.id), topologies.deleted_at as deleted_at
FROM topologies
INNER JOIN components ON topologies.id = components.topology_id
WHERE
components.deleted_at IS NULL AND
topologies.deleted_at IS NOT NULL
`).Scan(&rows).Error

if err != nil {
logger.Errorf("Error fetching deleted topology components: %v", err)
jobHistory.AddError(err.Error())
return
}

for _, r := range rows {
if err := db.DeleteComponentsOfTopology(r.ID, r.DeletedAt); err != nil {
logger.Errorf("Error deleting components for topology[%s]: %v", r.ID, err)
jobHistory.AddError(err.Error())
}
DeleteTopologyJob(r.ID)
}
jobHistory.IncrSuccess()
}
Loading