diff --git a/pkg/db/canary.go b/pkg/db/canary.go index dbd2b0642..792f22a5a 100644 --- a/pkg/db/canary.go +++ b/pkg/db/canary.go @@ -295,16 +295,9 @@ func CreateCheck(canary pkg.Canary, check *pkg.Check) error { return Gorm.Create(&check).Error } -func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]string, bool, error) { +func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, map[string]string, bool, error) { + var err error changed := false - model, err := pkg.CanaryFromV1(canary) - if err != nil { - return nil, nil, changed, err - } - if canary.GetPersistedID() != "" { - model.ID, _ = uuid.Parse(canary.GetPersistedID()) - } - model.Source = source tx := Gorm.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "agent_id"}, {Name: "name"}, {Name: "namespace"}, {Name: "source"}}, UpdateAll: true, @@ -333,15 +326,15 @@ func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]str return nil, nil, changed, err } + var spec v1.CanarySpec + if err = json.Unmarshal(model.Spec, &spec); err != nil { + return nil, nil, changed, err + } + var checks = make(map[string]string) var newCheckIDs []string - for _, config := range canary.Spec.GetAllChecks() { + for _, config := range spec.GetAllChecks() { check := pkg.FromExternalCheck(model, config) - // not creating the new check if already exists in the status - // status is not patched correctly with the status id - if checkID := canary.GetCheckID(check.Name); checkID != "" { - check.ID, _ = uuid.Parse(checkID) - } check.Spec, _ = json.Marshal(config) id, err := PersistCheck(check, model.ID) if err != nil { @@ -361,9 +354,25 @@ func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]str } metrics.UnregisterGauge(checkIDsToRemove) } + + model.Checks = checks return &model, checks, changed, nil } +func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]string, bool, error) { + changed := false + model, err := pkg.CanaryFromV1(canary) + if err != nil { + return nil, nil, changed, err + } + if canary.GetPersistedID() != "" { + model.ID, _ = uuid.Parse(canary.GetPersistedID()) + } + model.Source = source + + return PersistCanaryModel(model) +} + func RefreshCheckStatusSummary() { if err := duty.RefreshCheckStatusSummary(Pool); err != nil { logger.Errorf("error refreshing check_status_summary materialized view: %v", err) diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index c7ec7db0a..20785f819 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -365,6 +365,47 @@ func DeleteCanaryJob(id string) { CanaryScheduler.Remove(entry.ID) } +func ReconcileCanaryChecks() { + jobHistory := models.NewJobHistory("ReconcileCanaryChecks", "", "").Start() + _ = db.PersistJobHistory(jobHistory) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + // Get id, updateTime of all non deleted canaries + var rows []struct { + ID string + UpdatedAt time.Time + } + if err := db.Gorm.Raw(`SELECT id, updated_at FROM canaries WHERE deleted_at IS NULL`).Scan(&rows); err != nil { + logger.Errorf("Error fetching canaries: %v", err) + return + } + + var idsToPersist []string + for _, r := range rows { + updateTime, exists := canaryUpdateTimeCache.Load(r.ID) + if !exists { + idsToPersist = append(idsToPersist, r.ID) + canaryUpdateTimeCache.Store(r.ID, r.UpdatedAt) + continue + } + if r.UpdatedAt.After(updateTime.(time.Time)) { + idsToPersist = append(idsToPersist, r.ID) + } + } + + var canaries []pkg.Canary + if err := db.Gorm.Table("canaries").Where("id IN ?", idsToPersist).Find(&canaries).Error; err != nil { + logger.Errorf("Error fetching canaries: %v", err) + return + } + + for _, c := range canaries { + if _, _, _, err := db.PersistCanaryModel(c); err != nil { + logger.Errorf("Error persisting canary: %v", err) + } + } +} + func ReconcileDeletedCanaryChecks() { jobHistory := models.NewJobHistory("ReconcileDeletedCanaryChecks", "", "").Start() _ = db.PersistJobHistory(jobHistory) diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 32f87ab9e..505617d0f 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -88,6 +88,9 @@ func Start() { if _, err := ScheduleFunc(ReconcileDeletedCanaryChecksSchedule, canaryJobs.ReconcileDeletedCanaryChecks); err != nil { logger.Errorf("Failed to schedule ReconcileDeletedCanaryChecks: %v", err) } + if _, err := ScheduleFunc("@every 5m", canaryJobs.ReconcileCanaryChecks); err != nil { + logger.Errorf("Failed to schedule ReconcileCanaryChecks: %v", err) + } canaryJobs.CleanupMetricsGauges() canaryJobs.SyncCanaryJobs()