Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pull canaries from upstream and push back the results #1163

Merged
merged 10 commits into from
Aug 4, 2023
7 changes: 7 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/flanksource/canary-checker/pkg/cache"
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/canary-checker/pkg/jobs/canary"
"github.com/flanksource/canary-checker/pkg/runner"
"github.com/flanksource/commons/logger"
gomplate "github.com/flanksource/gomplate/v3"
Expand Down Expand Up @@ -56,6 +57,12 @@ func ServerFlags(flags *pflag.FlagSet) {
flags.IntVar(&db.CheckStatusRetentionDays, "check-status-retention-period", db.DefaultCheckStatusRetentionDays, "Check status retention period in days")
flags.IntVar(&db.CheckRetentionDays, "check-retention-period", db.DefaultCheckRetentionDays, "Check retention period in days")
flags.IntVar(&db.CanaryRetentionDays, "canary-retention-period", db.DefaultCanaryRetentionDays, "Canary retention period in days")

// Flags for push/pull
flags.StringVar(&canary.UpstreamConf.Host, "upstream-host", "", "central canary checker instance to push/pull canaries")
flags.StringVar(&canary.UpstreamConf.Username, "upstream-user", "", "upstream username")
flags.StringVar(&canary.UpstreamConf.Password, "upstream-password", "", "upstream password")
flags.StringVar(&canary.UpstreamConf.AgentName, "agent-name", "", "name of this agent")
}

func readFromEnv(v string) string {
Expand Down
1 change: 1 addition & 0 deletions pkg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func FromResult(result CheckResult) CheckStatus {
Check: &result.Check,
}
}

func FromV1(canary v1.Canary, check external.Check, statuses ...CheckStatus) Check {
canaryID, _ := uuid.Parse(canary.GetPersistedID())
c := Check{
Expand Down
8 changes: 7 additions & 1 deletion pkg/cache/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ func NewPostgresCache(pool *pgxpool.Pool) *postgresCache {
}

func (c *postgresCache) Add(check pkg.Check, statii ...pkg.CheckStatus) []string {
var checkIDs []string
checkIDs := make([]string, 0, len(statii))

for _, status := range statii {
if status.Status {
check.Status = "healthy"
} else {
check.Status = "unhealthy"
}

checkID, err := c.AddCheckFromStatus(check, status)
if err != nil {
logger.Errorf("error persisting check with canary %s: %v", check.CanaryID, err)
Expand All @@ -43,6 +45,7 @@ func (c *postgresCache) Add(check pkg.Check, statii ...pkg.CheckStatus) []string
}
c.AddCheckStatus(check, status)
}

return checkIDs
}

Expand All @@ -63,6 +66,7 @@ func (c *postgresCache) AddCheckStatus(check pkg.Check, status pkg.CheckStatus)
if err != nil {
logger.Errorf("error marshalling details: %v", err)
}

checks := pkg.Checks{}
if db.Gorm.Model(&checks).
Clauses(clause.Returning{Columns: []clause.Column{{Name: "id"}}}).
Expand All @@ -71,6 +75,7 @@ func (c *postgresCache) AddCheckStatus(check pkg.Check, status pkg.CheckStatus)
logger.Errorf("error updating check: %v", err)
return
}

if len(checks) == 0 || checks[0].ID == uuid.Nil {
logger.Debugf("check not found")
return
Expand Down Expand Up @@ -98,6 +103,7 @@ func (c *postgresCache) AddCheckStatus(check pkg.Check, status pkg.CheckStatus)
status.Status,
status.Time,
)

if err != nil {
logger.Errorf("error adding check status to postgres: %v", err)
}
Expand Down
165 changes: 165 additions & 0 deletions pkg/jobs/canary/sync_upstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package canary

import (
goctx "context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"github.com/flanksource/canary-checker/api/context"
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/upstream"
"gorm.io/gorm/clause"
)

var UpstreamConf upstream.UpstreamConfig

var tablesToReconcile = []string{
"checks",
"check_statuses",
}

// ReconcileCanaryResults coordinates with upstream and pushes any resource
// that are missing on the upstream.
func ReconcileCanaryResults() {
ctx := context.New(nil, nil, db.Gorm, v1.Canary{})

jobHistory := models.NewJobHistory("PushCanaryResultsToUpstream", "Canary", "")
_ = db.PersistJobHistory(jobHistory.Start())
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

reconciler := upstream.NewUpstreamReconciler(UpstreamConf, 5)
for _, table := range tablesToReconcile {
if err := reconciler.Sync(ctx, table); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("failed to sync table %s: %v", table, err)
} else {
jobHistory.IncrSuccess()
}
}
}

type CanaryPullResponse struct {
Before time.Time `json:"before"`
Canaries []models.Canary `json:"canaries,omitempty"`
}

// UpstreamPullJob pulls canaries from the upstream
type UpstreamPullJob struct {
lastRuntime time.Time
}

func (t *UpstreamPullJob) Run() {
jobHistory := models.NewJobHistory("PullUpstreamCanaries", "Canary", "")
_ = db.PersistJobHistory(jobHistory.Start())
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

if err := t.pull(UpstreamConf); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("error pulling from upstream: %v", err)
} else {
jobHistory.IncrSuccess()
}
}

func (t *UpstreamPullJob) pull(config upstream.UpstreamConfig) error {
logger.Tracef("pulling canaries from upstream since: %v", t.lastRuntime)

endpoint, err := url.JoinPath(config.Host, "upstream", "canary", "pull", config.AgentName)
if err != nil {
return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err)
}

req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return fmt.Errorf("error creating new http request: %w", err)
}

req.SetBasicAuth(config.Username, config.Password)

params := url.Values{}
params.Add("since", t.lastRuntime.Format(time.RFC3339))
req.URL.RawQuery = params.Encode()

httpClient := &http.Client{}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("error making request: %w", err)
}
defer resp.Body.Close()

var response CanaryPullResponse
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return fmt.Errorf("error decoding response: %w", err)
}

t.lastRuntime = response.Before

if len(response.Canaries) == 0 {
return nil
}

logger.Tracef("fetched %d canaries from upstream", len(response.Canaries))

return db.Gorm.Omit("agent_id").Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
UpdateAll: true,
}).Create(&response.Canaries).Error
}

type UpstreamPushJob struct {
lastRuntime time.Time

// MaxAge defines how far back we look into the past on startup whe
// lastRuntime is zero.
MaxAge time.Duration
}

func (t *UpstreamPushJob) Run() {
jobHistory := models.NewJobHistory("UpstreamPushJob", "Canary", "")
_ = db.PersistJobHistory(jobHistory.Start())
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

if err := t.run(); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("error pushing to upstream: %v", err)
} else {
jobHistory.IncrSuccess()
}
}

func (t *UpstreamPushJob) run() error {
logger.Tracef("running upstream push job")

var currentTime time.Time
if err := db.Gorm.Raw("SELECT NOW()").Scan(&currentTime).Error; err != nil {
return err
}

if t.lastRuntime.IsZero() {
t.lastRuntime = currentTime.Add(-t.MaxAge)
}

pushData := &upstream.PushData{AgentName: UpstreamConf.AgentName}
if err := db.Gorm.Where("created_at > ?", t.lastRuntime).Find(&pushData.CheckStatuses).Error; err != nil {
return err
}

if err := db.Gorm.Where("updated_at > ?", t.lastRuntime).Find(&pushData.Checks).Error; err != nil {
return err
}

t.lastRuntime = currentTime

if pushData.Count() == 0 {
return nil
}
logger.Tracef("pushing %d canary results to upstream", pushData.Count())

return upstream.Push(goctx.Background(), UpstreamConf, pushData)
}
25 changes: 25 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package jobs

import (
"time"

"github.com/flanksource/canary-checker/pkg/db"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
systemJobs "github.com/flanksource/canary-checker/pkg/jobs/system"
Expand All @@ -14,6 +16,9 @@ import (
var FuncScheduler = cron.New()

const (
PullCanaryFromUpstreamSchedule = "@every 30s"
PushCanaryToUpstreamSchedule = "@every 10s"
ReconcileCanaryToUpstreamSchedule = "@every 3h"
SyncCanaryJobsSchedule = "@every 2m"
SyncSystemsJobsSchedule = "@every 5m"
ComponentRunSchedule = "@every 2m"
Expand All @@ -39,6 +44,26 @@ func Start() {
canaryJobs.CanaryScheduler.Start()
FuncScheduler.Start()

if canaryJobs.UpstreamConf.Valid() {
pushJob := &canaryJobs.UpstreamPushJob{MaxAge: time.Minute * 5}
pushJob.Run()

pullJob := &canaryJobs.UpstreamPullJob{}
pullJob.Run()

if _, err := FuncScheduler.AddJob(PullCanaryFromUpstreamSchedule, pullJob); err != nil {
logger.Fatalf("Failed to schedule job [canaryJobs.Pull]: %v", err)
}

if _, err := FuncScheduler.AddJob(PushCanaryToUpstreamSchedule, pushJob); err != nil {
logger.Fatalf("Failed to schedule job [canaryJobs.UpstreamPushJob]: %v", err)
}

if _, err := ScheduleFunc(ReconcileCanaryToUpstreamSchedule, canaryJobs.ReconcileCanaryResults); err != nil {
logger.Fatalf("Failed to schedule job [canaryJobs.SyncWithUpstream]: %v", err)
}
}

if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.SyncCanaryJobs); err != nil {
logger.Errorf("Failed to schedule sync jobs for canary: %v", err)
}
Expand Down
Loading