diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index cedfb26be..b033320e6 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -1,10 +1,12 @@ package canary import ( + goctx "context" "encoding/json" "fmt" "net/http" "net/url" + "time" "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" @@ -22,9 +24,9 @@ var tablesToReconcile = []string{ "check_statuses", } -// PushCanaryResultsToUpstream coordinates with upstream and pushes any resource +// ReconcileCanaryResults coordinates with upstream and pushes any resource // that are missing on the upstream. -func PushCanaryResultsToUpstream() { +func ReconcileCanaryResults() { ctx := context.New(nil, nil, db.Gorm, v1.Canary{}) jobHistory := models.NewJobHistory("PushCanaryResultsToUpstream", "Canary", "") @@ -42,20 +44,27 @@ func PushCanaryResultsToUpstream() { } } -func Pull() { +// UpstreamPullJob pulls canaries from the upstream +type UpstreamPullJob struct { + lastRuntime time.Time +} + +func (t *UpstreamPullJob) Run() { jobHistory := models.NewJobHistory("PullUpstreamCanaries", "Canary", "") _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - if err := pull(UpstreamConf); err != nil { + if err := t.pull(UpstreamConf); err != nil { jobHistory.AddError(err.Error()) - logger.Errorf("Error pulling upstream: %v", err) + logger.Errorf("error pulling from upstream: %v", err) } else { jobHistory.IncrSuccess() } } -func pull(config upstream.UpstreamConfig) error { +func (t *UpstreamPullJob) pull(config upstream.UpstreamConfig) error { + logger.Tracef("pulling canaries from upstream since: %v", t.lastRuntime) + endpoint, err := url.JoinPath(config.Host, "upstream", "canary", "pull", config.AgentName) if err != nil { return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) @@ -68,6 +77,10 @@ func pull(config upstream.UpstreamConfig) error { req.SetBasicAuth(config.Username, config.Password) + params := url.Values{} + params.Add("since", t.lastRuntime.Format(time.RFC3339)) + req.URL.RawQuery = params.Encode() + httpClient := &http.Client{} resp, err := httpClient.Do(req) if err != nil { @@ -80,12 +93,68 @@ func pull(config upstream.UpstreamConfig) error { return fmt.Errorf("error decoding response: %w", err) } + t.lastRuntime = time.Now() + if len(canaries) == 0 { return nil } + logger.Tracef("fetched %d canaries from upstream", len(canaries)) + return db.Gorm.Omit("agent_id").Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "id"}}, UpdateAll: true, }).Create(&canaries).Error } + +type UpstreamPushJob struct { + lastRuntime time.Time + + // MaxAge defines how far back we look into the past on startup whe + // lastRuntime is zero. + MaxAge time.Duration +} + +func (t *UpstreamPushJob) Run() { + jobHistory := models.NewJobHistory("UpstreamPushJob", "Canary", "") + _ = db.PersistJobHistory(jobHistory.Start()) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + if err := t.run(); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("error pushing to upstream: %v", err) + } else { + jobHistory.IncrSuccess() + } +} + +func (t *UpstreamPushJob) run() error { + logger.Tracef("running upstream push job") + + var currentTime time.Time + if err := db.Gorm.Raw("SELECT NOW()").Scan(¤tTime).Error; err != nil { + return err + } + + if t.lastRuntime.IsZero() { + t.lastRuntime = currentTime.Add(-t.MaxAge) + } + + pushData := &upstream.PushData{AgentName: UpstreamConf.AgentName} + if err := db.Gorm.Where("created_at > ?", t.lastRuntime).Find(&pushData.CheckStatuses).Error; err != nil { + return err + } + + if err := db.Gorm.Where("updated_at > ?", t.lastRuntime).Find(&pushData.Checks).Error; err != nil { + return err + } + + t.lastRuntime = currentTime + + if pushData.Count() == 0 { + return nil + } + logger.Tracef("pushing %d canary results to upstream", pushData.Count()) + + return upstream.Push(goctx.Background(), UpstreamConf, pushData) +} diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index d6d0caaf1..7b18e99a8 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -1,6 +1,8 @@ package jobs import ( + "time" + "github.com/flanksource/canary-checker/pkg/db" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" systemJobs "github.com/flanksource/canary-checker/pkg/jobs/system" @@ -15,7 +17,8 @@ var FuncScheduler = cron.New() const ( PullCanaryFromUpstreamSchedule = "@every 2m" - PushCanaryToUpstreamSchedule = "@every 5m" + PushCanaryToUpstreamSchedule = "@every 10s" + ReconcileCanaryToUpstreamSchedule = "@every 3h" SyncCanaryJobsSchedule = "@every 2m" SyncSystemsJobsSchedule = "@every 5m" ComponentRunSchedule = "@every 2m" @@ -42,15 +45,22 @@ func Start() { FuncScheduler.Start() if canaryJobs.UpstreamConf.Valid() { - canaryJobs.Pull() - canaryJobs.PushCanaryResultsToUpstream() + pushJob := &canaryJobs.UpstreamPushJob{MaxAge: time.Minute * 5} + pushJob.Run() + + pullJob := &canaryJobs.UpstreamPullJob{} + pullJob.Run() + + if _, err := FuncScheduler.AddJob(PullCanaryFromUpstreamSchedule, pullJob); err != nil { + logger.Fatalf("Failed to schedule job [canaryJobs.Pull]: %v", err) + } - if _, err := ScheduleFunc(PullCanaryFromUpstreamSchedule, canaryJobs.Pull); err != nil { - logger.Errorf("Failed to schedule job [canaryJobs.Pull]: %v", err) + if _, err := FuncScheduler.AddJob(PushCanaryToUpstreamSchedule, pushJob); err != nil { + logger.Fatalf("Failed to schedule job [canaryJobs.UpstreamPushJob]: %v", err) } - if _, err := ScheduleFunc(PushCanaryToUpstreamSchedule, canaryJobs.PushCanaryResultsToUpstream); err != nil { - logger.Errorf("Failed to schedule job [canaryJobs.SyncWithUpstream]: %v", err) + if _, err := ScheduleFunc(ReconcileCanaryToUpstreamSchedule, canaryJobs.ReconcileCanaryResults); err != nil { + logger.Fatalf("Failed to schedule job [canaryJobs.SyncWithUpstream]: %v", err) } }