Skip to content

Commit

Permalink
feat: create new job that pushes the canary results to upstream more
Browse files Browse the repository at this point in the history
frequently
  • Loading branch information
adityathebe committed Aug 2, 2023
1 parent bd6f41a commit bda21d9
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 13 deletions.
81 changes: 75 additions & 6 deletions pkg/jobs/canary/sync_upstream.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package canary

import (
goctx "context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"github.com/flanksource/canary-checker/api/context"
v1 "github.com/flanksource/canary-checker/api/v1"
Expand All @@ -22,9 +24,9 @@ var tablesToReconcile = []string{
"check_statuses",
}

// PushCanaryResultsToUpstream coordinates with upstream and pushes any resource
// ReconcileCanaryResults coordinates with upstream and pushes any resource
// that are missing on the upstream.
func PushCanaryResultsToUpstream() {
func ReconcileCanaryResults() {
ctx := context.New(nil, nil, db.Gorm, v1.Canary{})

jobHistory := models.NewJobHistory("PushCanaryResultsToUpstream", "Canary", "")
Expand All @@ -42,20 +44,27 @@ func PushCanaryResultsToUpstream() {
}
}

func Pull() {
// UpstreamPullJob pulls canaries from the upstream
type UpstreamPullJob struct {
lastRuntime time.Time
}

func (t *UpstreamPullJob) Run() {
jobHistory := models.NewJobHistory("PullUpstreamCanaries", "Canary", "")
_ = db.PersistJobHistory(jobHistory.Start())
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

if err := pull(UpstreamConf); err != nil {
if err := t.pull(UpstreamConf); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("Error pulling upstream: %v", err)
logger.Errorf("error pulling from upstream: %v", err)
} else {
jobHistory.IncrSuccess()
}
}

func pull(config upstream.UpstreamConfig) error {
func (t *UpstreamPullJob) pull(config upstream.UpstreamConfig) error {
logger.Tracef("pulling canaries from upstream since: %v", t.lastRuntime)

endpoint, err := url.JoinPath(config.Host, "upstream", "canary", "pull", config.AgentName)
if err != nil {
return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err)
Expand All @@ -68,6 +77,10 @@ func pull(config upstream.UpstreamConfig) error {

req.SetBasicAuth(config.Username, config.Password)

params := url.Values{}
params.Add("since", t.lastRuntime.Format(time.RFC3339))
req.URL.RawQuery = params.Encode()

httpClient := &http.Client{}
resp, err := httpClient.Do(req)
if err != nil {
Expand All @@ -80,12 +93,68 @@ func pull(config upstream.UpstreamConfig) error {
return fmt.Errorf("error decoding response: %w", err)
}

t.lastRuntime = time.Now()

if len(canaries) == 0 {
return nil
}

logger.Tracef("fetched %d canaries from upstream", len(canaries))

return db.Gorm.Omit("agent_id").Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
UpdateAll: true,
}).Create(&canaries).Error
}

type UpstreamPushJob struct {
lastRuntime time.Time

// MaxAge defines how far back we look into the past on startup whe
// lastRuntime is zero.
MaxAge time.Duration
}

func (t *UpstreamPushJob) Run() {
jobHistory := models.NewJobHistory("UpstreamPushJob", "Canary", "")
_ = db.PersistJobHistory(jobHistory.Start())
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

if err := t.run(); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("error pushing to upstream: %v", err)
} else {
jobHistory.IncrSuccess()
}
}

func (t *UpstreamPushJob) run() error {
logger.Tracef("running upstream push job")

var currentTime time.Time
if err := db.Gorm.Raw("SELECT NOW()").Scan(&currentTime).Error; err != nil {
return err
}

if t.lastRuntime.IsZero() {
t.lastRuntime = currentTime.Add(-t.MaxAge)
}

pushData := &upstream.PushData{AgentName: UpstreamConf.AgentName}
if err := db.Gorm.Where("created_at > ?", t.lastRuntime).Find(&pushData.CheckStatuses).Error; err != nil {
return err
}

if err := db.Gorm.Where("updated_at > ?", t.lastRuntime).Find(&pushData.Checks).Error; err != nil {
return err
}

t.lastRuntime = currentTime

if pushData.Count() == 0 {
return nil
}
logger.Tracef("pushing %d canary results to upstream", pushData.Count())

return upstream.Push(goctx.Background(), UpstreamConf, pushData)
}
24 changes: 17 additions & 7 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package jobs

import (
"time"

"github.com/flanksource/canary-checker/pkg/db"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
systemJobs "github.com/flanksource/canary-checker/pkg/jobs/system"
Expand All @@ -15,7 +17,8 @@ var FuncScheduler = cron.New()

const (
PullCanaryFromUpstreamSchedule = "@every 2m"
PushCanaryToUpstreamSchedule = "@every 5m"
PushCanaryToUpstreamSchedule = "@every 10s"
ReconcileCanaryToUpstreamSchedule = "@every 3h"
SyncCanaryJobsSchedule = "@every 2m"
SyncSystemsJobsSchedule = "@every 5m"
ComponentRunSchedule = "@every 2m"
Expand All @@ -42,15 +45,22 @@ func Start() {
FuncScheduler.Start()

if canaryJobs.UpstreamConf.Valid() {
canaryJobs.Pull()
canaryJobs.PushCanaryResultsToUpstream()
pushJob := &canaryJobs.UpstreamPushJob{MaxAge: time.Minute * 5}
pushJob.Run()

pullJob := &canaryJobs.UpstreamPullJob{}
pullJob.Run()

if _, err := FuncScheduler.AddJob(PullCanaryFromUpstreamSchedule, pullJob); err != nil {
logger.Fatalf("Failed to schedule job [canaryJobs.Pull]: %v", err)
}

if _, err := ScheduleFunc(PullCanaryFromUpstreamSchedule, canaryJobs.Pull); err != nil {
logger.Errorf("Failed to schedule job [canaryJobs.Pull]: %v", err)
if _, err := FuncScheduler.AddJob(PushCanaryToUpstreamSchedule, pushJob); err != nil {
logger.Fatalf("Failed to schedule job [canaryJobs.UpstreamPushJob]: %v", err)
}

if _, err := ScheduleFunc(PushCanaryToUpstreamSchedule, canaryJobs.PushCanaryResultsToUpstream); err != nil {
logger.Errorf("Failed to schedule job [canaryJobs.SyncWithUpstream]: %v", err)
if _, err := ScheduleFunc(ReconcileCanaryToUpstreamSchedule, canaryJobs.ReconcileCanaryResults); err != nil {
logger.Fatalf("Failed to schedule job [canaryJobs.SyncWithUpstream]: %v", err)
}
}

Expand Down

0 comments on commit bda21d9

Please sign in to comment.