diff --git a/pkg/api.go b/pkg/api.go index fa05ba336..133a4d85f 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -7,11 +7,11 @@ import ( "github.com/flanksource/canary-checker/api/external" v1 "github.com/flanksource/canary-checker/api/v1" - "github.com/flanksource/canary-checker/pkg/db/types" "github.com/flanksource/canary-checker/pkg/labels" "github.com/flanksource/canary-checker/pkg/utils" "github.com/flanksource/commons/console" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/types" "github.com/google/uuid" "github.com/lib/pq" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/controllers/canary_controller.go b/pkg/controllers/canary_controller.go index 8ea3c6610..3e7b1d4b6 100644 --- a/pkg/controllers/canary_controller.go +++ b/pkg/controllers/canary_controller.go @@ -87,7 +87,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c } if !canary.DeletionTimestamp.IsZero() { - if err := db.DeleteCanary(*canary); err != nil { + if err := db.DeleteCanary(canary.GetPersistedID(), canary.DeletionTimestamp.Time); err != nil { logger.Error(err, "failed to delete canary") } canaryJobs.DeleteCanaryJob(canary.GetPersistedID()) @@ -95,7 +95,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c return ctrl.Result{}, r.Update(ctx, canary) } - dbCanary, checks, changed, err := db.PersistCanary(*canary, "kubernetes/"+string(canary.ObjectMeta.UID)) + dbCanary, checks, changed, err := db.PersistCanary(*canary, "kubernetes/"+canary.GetPersistedID()) if err != nil { return ctrl.Result{Requeue: true}, err } diff --git a/pkg/db/canary.go b/pkg/db/canary.go index dbd2b0642..2bf06b4a5 100644 --- a/pkg/db/canary.go +++ b/pkg/db/canary.go @@ -16,6 +16,7 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/duty" "github.com/flanksource/duty/models" + dutyTypes "github.com/flanksource/duty/types" "github.com/google/uuid" "gorm.io/gorm" "gorm.io/gorm/clause" @@ -182,37 +183,38 @@ func RemoveTransformedChecks(ids []string) error { Error } -func DeleteCanary(canary v1.Canary) error { - logger.Infof("deleting canary %s/%s", canary.Namespace, canary.Name) - model, err := pkg.CanaryFromV1(canary) - if err != nil { - return err - } - deleteTime := time.Now() - persistedID := canary.GetPersistedID() - var checkIDs []string - for _, checkID := range canary.Status.Checks { - checkIDs = append(checkIDs, checkID) - } - metrics.UnregisterGauge(checkIDs) - if persistedID == "" { - logger.Errorf("Canary %s/%s has not been persisted", canary.Namespace, canary.Name) - return nil - } - if err := Gorm.Where("id = ?", persistedID).Find(&model).UpdateColumn("deleted_at", deleteTime).Error; err != nil { +func DeleteCanary(id string, deleteTime time.Time) error { + logger.Infof("Deleting canary[%s]", id) + + if err := Gorm.Table("canaries").Where("id = ?", id).UpdateColumn("deleted_at", deleteTime).Error; err != nil { return err } - if err := DeleteChecksForCanary(persistedID, deleteTime); err != nil { + checkIDs, err := DeleteChecksForCanary(id, deleteTime) + if err != nil { return err } - if err := DeleteCheckComponentRelationshipsForCanary(persistedID, deleteTime); err != nil { + metrics.UnregisterGauge(checkIDs) + + if err := DeleteCheckComponentRelationshipsForCanary(id, deleteTime); err != nil { return err } return nil } -func DeleteChecksForCanary(id string, deleteTime time.Time) error { - return Gorm.Table("checks").Where("canary_id = ? and deleted_at is null", id).UpdateColumn("deleted_at", deleteTime).Error +func DeleteChecksForCanary(id string, deleteTime time.Time) ([]string, error) { + var checkIDs []string + var checks []pkg.Check + err := Gorm.Model(&checks). + Table("checks"). + Clauses(clause.Returning{Columns: []clause.Column{{Name: "id"}}}). + Where("canary_id = ? and deleted_at IS NULL", id). + UpdateColumn("deleted_at", deleteTime). + Error + + for _, c := range checks { + checkIDs = append(checkIDs, c.ID.String()) + } + return checkIDs, err } func DeleteCheckComponentRelationshipsForCanary(id string, deleteTime time.Time) error { @@ -285,7 +287,7 @@ func FindDeletedChecksSince(ctx context.Context, since time.Time) ([]string, err func CreateCanary(canary *pkg.Canary) error { if canary.Spec == nil || len(canary.Spec) == 0 { empty := []byte("{}") - canary.Spec = types.JSON(empty) + canary.Spec = dutyTypes.JSON(types.JSON(empty)) } return Gorm.Create(canary).Error @@ -295,16 +297,9 @@ func CreateCheck(canary pkg.Canary, check *pkg.Check) error { return Gorm.Create(&check).Error } -func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]string, bool, error) { +func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, map[string]string, bool, error) { + var err error changed := false - model, err := pkg.CanaryFromV1(canary) - if err != nil { - return nil, nil, changed, err - } - if canary.GetPersistedID() != "" { - model.ID, _ = uuid.Parse(canary.GetPersistedID()) - } - model.Source = source tx := Gorm.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "agent_id"}, {Name: "name"}, {Name: "namespace"}, {Name: "source"}}, UpdateAll: true, @@ -333,15 +328,15 @@ func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]str return nil, nil, changed, err } + var spec v1.CanarySpec + if err = json.Unmarshal(model.Spec, &spec); err != nil { + return nil, nil, changed, err + } + var checks = make(map[string]string) var newCheckIDs []string - for _, config := range canary.Spec.GetAllChecks() { + for _, config := range spec.GetAllChecks() { check := pkg.FromExternalCheck(model, config) - // not creating the new check if already exists in the status - // status is not patched correctly with the status id - if checkID := canary.GetCheckID(check.Name); checkID != "" { - check.ID, _ = uuid.Parse(checkID) - } check.Spec, _ = json.Marshal(config) id, err := PersistCheck(check, model.ID) if err != nil { @@ -361,9 +356,25 @@ func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]str } metrics.UnregisterGauge(checkIDsToRemove) } + + model.Checks = checks return &model, checks, changed, nil } +func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]string, bool, error) { + changed := false + model, err := pkg.CanaryFromV1(canary) + if err != nil { + return nil, nil, changed, err + } + if canary.GetPersistedID() != "" { + model.ID, _ = uuid.Parse(canary.GetPersistedID()) + } + model.Source = source + + return PersistCanaryModel(model) +} + func RefreshCheckStatusSummary() { if err := duty.RefreshCheckStatusSummary(Pool); err != nil { logger.Errorf("error refreshing check_status_summary materialized view: %v", err) diff --git a/pkg/db/topology.go b/pkg/db/topology.go index 23189ae9e..b48ca3732 100644 --- a/pkg/db/topology.go +++ b/pkg/db/topology.go @@ -377,7 +377,7 @@ func DeleteInlineCanariesForComponent(componentID string, deleteTime time.Time) return err } for _, c := range canaries { - if err := DeleteChecksForCanary(c.ID.String(), deleteTime); err != nil { + if _, err := DeleteChecksForCanary(c.ID.String(), deleteTime); err != nil { logger.Debugf("Error deleting checks for canary %v", c.ID) continue } diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index c7ec7db0a..5dc1b4e4b 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -17,7 +17,9 @@ import ( "github.com/flanksource/canary-checker/pkg/utils" "github.com/flanksource/commons/logger" "github.com/flanksource/duty/models" + dutyTypes "github.com/flanksource/duty/types" "github.com/flanksource/kommons" + "github.com/google/go-cmp/cmp" "github.com/robfig/cron/v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -365,39 +367,61 @@ func DeleteCanaryJob(id string) { CanaryScheduler.Remove(entry.ID) } -func ReconcileDeletedCanaryChecks() { - jobHistory := models.NewJobHistory("ReconcileDeletedCanaryChecks", "", "").Start() +func ReconcileCanaryChecks() { + logger.Infof("Reconciling Canary Checks") + jobHistory := models.NewJobHistory("ReconcileCanaryChecks", "", "").Start() _ = db.PersistJobHistory(jobHistory) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + canaries, err := db.GetAllCanariesForSync() + if err != nil { + logger.Errorf("Error fetching canaries: %v", err) + jobHistory.AddError(err.Error()) + return + } + canaryCheckMapping := make(map[string]dutyTypes.JSONStringMap) + for _, c := range canaries { + canaryCheckMapping[c.ID.String()] = c.Checks + } + var rows []struct { - ID string - DeletedAt time.Time + CanaryID string + Checks dutyTypes.JSONStringMap } - // Select all components whose topology ID is deleted but their deleted at is not marked - err := db.Gorm.Raw(` - SELECT DISTINCT(canaries.id), canaries.deleted_at - FROM canaries - INNER JOIN checks ON canaries.id = checks.canary_id + db.Gorm.Raw(` + SELECT json_object_agg(checks.name, checks.id) as checks, canary_id + FROM checks WHERE - checks.deleted_at IS NULL AND - canaries.deleted_at IS NOT NULL - `).Scan(&rows).Error + deleted_at IS NULL AND + agent_id = '00000000-0000-0000-0000-000000000000' + GROUP BY canary_id + `).Scan(&rows) - if err != nil { - logger.Errorf("Error fetching deleted canary checks: %v", err) - jobHistory.AddError(err.Error()) + var idsToPersist []string + for _, r := range rows { + if checks, exists := canaryCheckMapping[r.CanaryID]; exists { + if !cmp.Equal(r.Checks, checks) { + idsToPersist = append(idsToPersist, r.CanaryID) + } + } else { + // If the canaryID is not found in map, that means + // check is not deleted but the canary is + logger.Errorf("Canary[%s] is marked deleted but has active checks") + } + } + + var canariesToPersist []pkg.Canary + if err := db.Gorm.Table("canaries").Where("id IN ?", idsToPersist).Find(&canariesToPersist).Error; err != nil { + logger.Errorf("Error fetching canaries: %v", err) return } - for _, r := range rows { - if err := db.DeleteChecksForCanary(r.ID, r.DeletedAt); err != nil { - logger.Errorf("Error deleting checks for canary[%s]: %v", r.ID, err) + for _, c := range canaries { + if _, _, _, err := db.PersistCanaryModel(c); err != nil { + logger.Errorf("Error persisting canary: %v", err) jobHistory.AddError(err.Error()) } - DeleteCanaryJob(r.ID) } - jobHistory.IncrSuccess() } func ScheduleFunc(schedule string, fn func()) (interface{}, error) { diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 32f87ab9e..ed092df05 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -30,7 +30,6 @@ const ( PrometheusGaugeCleanupSchedule = "@every 1h" ReconcileDeletedTopologyComponentsSchedule = "@every 1h" - ReconcileDeletedCanaryChecksSchedule = "@every 1h" ) func Start() { @@ -85,10 +84,11 @@ func Start() { if _, err := ScheduleFunc(ReconcileDeletedTopologyComponentsSchedule, systemJobs.ReconcileDeletedTopologyComponents); err != nil { logger.Errorf("Failed to schedule ReconcileDeletedTopologyComponents: %v", err) } - if _, err := ScheduleFunc(ReconcileDeletedCanaryChecksSchedule, canaryJobs.ReconcileDeletedCanaryChecks); err != nil { - logger.Errorf("Failed to schedule ReconcileDeletedCanaryChecks: %v", err) + if _, err := ScheduleFunc("@every 5m", canaryJobs.ReconcileCanaryChecks); err != nil { + logger.Errorf("Failed to schedule ReconcileCanaryChecks: %v", err) } + canaryJobs.ReconcileCanaryChecks() canaryJobs.CleanupMetricsGauges() canaryJobs.SyncCanaryJobs() systemJobs.SyncTopologyJobs() diff --git a/pkg/topology/run.go b/pkg/topology/run.go index b41892406..5ea5d6783 100644 --- a/pkg/topology/run.go +++ b/pkg/topology/run.go @@ -269,13 +269,21 @@ func lookupConfig(ctx *ComponentContext, property *v1.Property) (*pkg.Property, } templateEnv := map[string]any{ - "config": _config.Spec.ToMapStringAny(), - "tags": _config.Tags.ToMapStringAny(), + "config": _config.Spec, + "tags": toMapStringAny(_config.Tags), } prop.Text, err = gomplate.RunTemplate(templateEnv, property.ConfigLookup.Display.Template.Gomplate()) return prop, err } +func toMapStringAny(m map[string]string) map[string]any { + r := make(map[string]any) + for k, v := range m { + r[k] = v + } + return r +} + func lookupProperty(ctx *ComponentContext, property *v1.Property) (pkg.Properties, error) { prop := pkg.NewProperty(*property)