From 8336d7f639d143dec07643eb324fbbc5d745a8de Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 18 Jul 2023 18:57:09 +0545 Subject: [PATCH 01/10] draft: agent sync --- cmd/root.go | 7 ++++++ cmd/serve.go | 1 + pkg/api/pull.go | 21 +++++++++++++++++ pkg/jobs/canary/canary_jobs.go | 42 ++++++++++++++++++++++++++++++++++ pkg/jobs/jobs.go | 5 ++++ 5 files changed, 76 insertions(+) create mode 100644 pkg/api/pull.go diff --git a/cmd/root.go b/cmd/root.go index c59179232..25752f30f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -5,6 +5,7 @@ import ( "github.com/flanksource/canary-checker/pkg/cache" "github.com/flanksource/canary-checker/pkg/db" + "github.com/flanksource/canary-checker/pkg/jobs/canary" "github.com/flanksource/canary-checker/pkg/runner" "github.com/flanksource/commons/logger" gomplate "github.com/flanksource/gomplate/v3" @@ -56,6 +57,12 @@ func ServerFlags(flags *pflag.FlagSet) { flags.IntVar(&db.CheckStatusRetentionDays, "check-status-retention-period", db.DefaultCheckStatusRetentionDays, "Check status retention period in days") flags.IntVar(&db.CheckRetentionDays, "check-retention-period", db.DefaultCheckRetentionDays, "Check retention period in days") flags.IntVar(&db.CanaryRetentionDays, "canary-retention-period", db.DefaultCanaryRetentionDays, "Canary retention period in days") + + // Flags for push/pull + flags.StringVar(&canary.UpstreamConf.Host, "upstream-host", "", "central canary checker instance to push/pull canaries") + flags.StringVar(&canary.UpstreamConf.Username, "upstream-user", "", "upstream username") + flags.StringVar(&canary.UpstreamConf.Password, "upstream-password", "", "upstream password") + flags.StringVar(&canary.UpstreamConf.AgentName, "agent-name", "", "name of this agent") } func readFromEnv(v string) string { diff --git a/cmd/serve.go b/cmd/serve.go index 7800dd272..602b97479 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -125,6 +125,7 @@ func serve() { e.GET("/api/summary", api.HealthSummary) e.GET("/about", api.About) e.GET("/api/graph", api.CheckDetails) + e.GET("/api/pull/:agent_name", api.Pull) e.POST("/api/push", api.PushHandler) e.GET("/api/details", api.DetailsHandler) e.GET("/api/topology", api.Topology) diff --git a/pkg/api/pull.go b/pkg/api/pull.go new file mode 100644 index 000000000..9998a5c85 --- /dev/null +++ b/pkg/api/pull.go @@ -0,0 +1,21 @@ +package api + +import ( + "net/http" + + "github.com/flanksource/canary-checker/pkg/db" + "github.com/flanksource/duty/models" + "github.com/labstack/echo/v4" +) + +// Pull returns all canaries for the requested agent +func Pull(c echo.Context) error { + agentName := c.Param("agent_name") + + var canaries []models.Canary + if err := db.Gorm.Where("deleted_at IS NULL").Joins("LEFT JOIN agents ON canaries.agent_id = agents.id").Where("agents.name = ?", agentName).Find(&canaries).Error; err != nil { + return errorResonse(c, err, http.StatusInternalServerError) + } + + return c.JSON(http.StatusOK, canaries) +} diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 5dc1b4e4b..510928ae8 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -1,7 +1,10 @@ package canary import ( + "encoding/json" "fmt" + "net/http" + "net/url" "path" "sync" "time" @@ -21,6 +24,7 @@ import ( "github.com/flanksource/kommons" "github.com/google/go-cmp/cmp" "github.com/robfig/cron/v3" + "gorm.io/gorm/clause" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -32,6 +36,15 @@ var DataFile string var Executor bool var LogPass, LogFail bool +type UpstreamConfig struct { + AgentName string + Host string + Username string + Password string +} + +var UpstreamConf UpstreamConfig + var Kommons *kommons.Client var Kubernetes kubernetes.Interface var FuncScheduler = cron.New() @@ -432,3 +445,32 @@ func init() { // We are adding a small buffer to prevent blocking CanaryStatusChannel = make(chan CanaryStatusPayload, 64) } + +func Pull() { + if err := pull(UpstreamConf); err != nil { + logger.Errorf("Error pulling upstream: %v", err) + } +} + +func pull(config UpstreamConfig) error { + endpoint, err := url.JoinPath(UpstreamConf.Host, "api", "pull") + if err != nil { + return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) + } + + resp, err := http.Get(endpoint) + if err != nil { + return err + } + defer resp.Body.Close() + + var canaries []models.Canary + if err := json.NewDecoder(resp.Body).Decode(&canaries); err != nil { + return fmt.Errorf("error decoding response: %w", err) + } + + return db.Gorm.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + UpdateAll: true, + }).Create(&canaries).Error +} diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index ed092df05..5ff31bae5 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -39,6 +39,11 @@ func Start() { canaryJobs.CanaryScheduler.Start() FuncScheduler.Start() + if canaryJobs.UpstreamConf.AgentName != "" { + if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.Pull); err != nil { + logger.Errorf("Failed to pull canaries from central server: %v", err) + } + } if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.SyncCanaryJobs); err != nil { logger.Errorf("Failed to schedule sync jobs for canary: %v", err) } From 7d7075f95763a4c7d6ed32f902dfb0101aee8b04 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 19 Jul 2023 14:01:35 +0545 Subject: [PATCH 02/10] added push * moved all db funcs tion pkg/db --- api/v1/sync.go | 17 ++++++++++++++ cmd/serve.go | 1 + pkg/api/agent.go | 42 ++++++++++++++++++++++++++++++++++ pkg/api/pull.go | 21 ----------------- pkg/db/agent.go | 23 +++++++++++++++++++ pkg/db/canary.go | 23 +++++++++++++++++++ pkg/jobs/canary/canary_jobs.go | 6 ++++- pkg/jobs/jobs.go | 3 ++- 8 files changed, 113 insertions(+), 23 deletions(-) create mode 100644 api/v1/sync.go create mode 100644 pkg/api/agent.go delete mode 100644 pkg/api/pull.go create mode 100644 pkg/db/agent.go diff --git a/api/v1/sync.go b/api/v1/sync.go new file mode 100644 index 000000000..a4467c4db --- /dev/null +++ b/api/v1/sync.go @@ -0,0 +1,17 @@ +package v1 + +import ( + "github.com/flanksource/duty/models" + "github.com/google/uuid" +) + +type PushData struct { + Checks []models.Check `json:"checks,omitempty"` + CheckStatuses []models.CheckStatus `json:"check_statuses,omitempty"` +} + +func (t *PushData) SetAgentID(id uuid.UUID) { + for i := range t.Checks { + t.Checks[i].AgentID = id + } +} diff --git a/cmd/serve.go b/cmd/serve.go index 602b97479..2f7113bfa 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -126,6 +126,7 @@ func serve() { e.GET("/about", api.About) e.GET("/api/graph", api.CheckDetails) e.GET("/api/pull/:agent_name", api.Pull) + e.GET("/api/push/:agent_name", api.Push) e.POST("/api/push", api.PushHandler) e.GET("/api/details", api.DetailsHandler) e.GET("/api/topology", api.Topology) diff --git a/pkg/api/agent.go b/pkg/api/agent.go new file mode 100644 index 000000000..040a849a3 --- /dev/null +++ b/pkg/api/agent.go @@ -0,0 +1,42 @@ +package api + +import ( + "encoding/json" + "net/http" + + v1 "github.com/flanksource/canary-checker/api/v1" + "github.com/flanksource/canary-checker/pkg/db" + "github.com/labstack/echo/v4" +) + +// Pull returns all canaries for the requested agent +func Pull(c echo.Context) error { + agentName := c.Param("agent_name") + + canaries, err := db.GetCanariesOfAgent(c.Request().Context(), agentName) + if err != nil { + return errorResonse(c, err, http.StatusInternalServerError) + } + + return c.JSON(http.StatusOK, canaries) +} + +// Push stores all the check statuses sent by the agent +func Push(c echo.Context) error { + agentName := c.Param("agent_name") + + agent, err := db.FindAgent(c.Request().Context(), agentName) + if err != nil { + return errorResonse(c, err, http.StatusInternalServerError) + } else if agent == nil { + return errorResonse(c, err, http.StatusNotFound) + } + + var req v1.PushData + if err := json.NewDecoder(c.Request().Body).Decode(&req); err != nil { + return errorResonse(c, err, http.StatusBadRequest) + } + req.SetAgentID(agent.ID) + + return db.InsertAgentCheckResults(c.Request().Context(), req) +} diff --git a/pkg/api/pull.go b/pkg/api/pull.go deleted file mode 100644 index 9998a5c85..000000000 --- a/pkg/api/pull.go +++ /dev/null @@ -1,21 +0,0 @@ -package api - -import ( - "net/http" - - "github.com/flanksource/canary-checker/pkg/db" - "github.com/flanksource/duty/models" - "github.com/labstack/echo/v4" -) - -// Pull returns all canaries for the requested agent -func Pull(c echo.Context) error { - agentName := c.Param("agent_name") - - var canaries []models.Canary - if err := db.Gorm.Where("deleted_at IS NULL").Joins("LEFT JOIN agents ON canaries.agent_id = agents.id").Where("agents.name = ?", agentName).Find(&canaries).Error; err != nil { - return errorResonse(c, err, http.StatusInternalServerError) - } - - return c.JSON(http.StatusOK, canaries) -} diff --git a/pkg/db/agent.go b/pkg/db/agent.go new file mode 100644 index 000000000..3b852bc81 --- /dev/null +++ b/pkg/db/agent.go @@ -0,0 +1,23 @@ +package db + +import ( + "context" + "errors" + + "github.com/flanksource/duty/models" + "gorm.io/gorm" +) + +func FindAgent(ctx context.Context, name string) (*models.Agent, error) { + var agent models.Agent + err := Gorm.WithContext(ctx).Where("name = ?", name).First(&agent).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + + return nil, err + } + + return &agent, nil +} diff --git a/pkg/db/canary.go b/pkg/db/canary.go index 9b45bb3f5..e1068be30 100644 --- a/pkg/db/canary.go +++ b/pkg/db/canary.go @@ -22,6 +22,29 @@ import ( "gorm.io/gorm/clause" ) +func GetCanariesOfAgent(ctx context.Context, agentName string) ([]models.Canary, error) { + var canaries []models.Canary + err := Gorm.WithContext(ctx).Where("deleted_at IS NULL").Joins("LEFT JOIN agents ON canaries.agent_id = agents.id").Where("agents.name = ?", agentName).Find(&canaries).Error + return canaries, err +} + +func InsertAgentCheckResults(ctx context.Context, req v1.PushData) error { + if len(req.Checks) > 0 { + if err := Gorm.Clauses(clause.OnConflict{UpdateAll: true}).CreateInBatches(req.Checks, 500).Error; err != nil { + return fmt.Errorf("error upserting checks: %w", err) + } + } + + if len(req.CheckStatuses) > 0 { + cols := []clause.Column{{Name: "check_id"}, {Name: "time"}} + if err := Gorm.Clauses(clause.OnConflict{UpdateAll: true, Columns: cols}).CreateInBatches(req.CheckStatuses, 500).Error; err != nil { + return fmt.Errorf("error upserting check_statuses: %w", err) + } + } + + return nil +} + func GetAllCanariesForSync() ([]pkg.Canary, error) { var _canaries []pkg.Canary var rawCanaries interface{} diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 510928ae8..d19901a84 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -43,6 +43,10 @@ type UpstreamConfig struct { Password string } +func (t *UpstreamConfig) Valid() bool { + return t.Host != "" && t.Username != "" && t.Password != "" && t.AgentName != "" +} + var UpstreamConf UpstreamConfig var Kommons *kommons.Client @@ -453,7 +457,7 @@ func Pull() { } func pull(config UpstreamConfig) error { - endpoint, err := url.JoinPath(UpstreamConf.Host, "api", "pull") + endpoint, err := url.JoinPath(UpstreamConf.Host, "api", "pull", config.AgentName) if err != nil { return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) } diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 5ff31bae5..e59b9eddf 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -39,7 +39,8 @@ func Start() { canaryJobs.CanaryScheduler.Start() FuncScheduler.Start() - if canaryJobs.UpstreamConf.AgentName != "" { + if canaryJobs.UpstreamConf.Valid() { + canaryJobs.Pull() if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.Pull); err != nil { logger.Errorf("Failed to pull canaries from central server: %v", err) } From f14cb3caa3af05de6fb1802723415088c4b79f91 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 21 Jul 2023 11:36:54 +0545 Subject: [PATCH 03/10] feat: push check statuses to upstream --- api/v1/sync.go | 17 --------- cmd/serve.go | 2 -- pkg/api.go | 1 + pkg/api/agent.go | 42 ---------------------- pkg/cache/postgres.go | 33 ++++++++++++++--- pkg/db/canary.go | 23 ------------ pkg/jobs/canary/canary_jobs.go | 66 +++++++++++++++++++++++++++++++++- 7 files changed, 94 insertions(+), 90 deletions(-) delete mode 100644 api/v1/sync.go delete mode 100644 pkg/api/agent.go diff --git a/api/v1/sync.go b/api/v1/sync.go deleted file mode 100644 index a4467c4db..000000000 --- a/api/v1/sync.go +++ /dev/null @@ -1,17 +0,0 @@ -package v1 - -import ( - "github.com/flanksource/duty/models" - "github.com/google/uuid" -) - -type PushData struct { - Checks []models.Check `json:"checks,omitempty"` - CheckStatuses []models.CheckStatus `json:"check_statuses,omitempty"` -} - -func (t *PushData) SetAgentID(id uuid.UUID) { - for i := range t.Checks { - t.Checks[i].AgentID = id - } -} diff --git a/cmd/serve.go b/cmd/serve.go index 2f7113bfa..7800dd272 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -125,8 +125,6 @@ func serve() { e.GET("/api/summary", api.HealthSummary) e.GET("/about", api.About) e.GET("/api/graph", api.CheckDetails) - e.GET("/api/pull/:agent_name", api.Pull) - e.GET("/api/push/:agent_name", api.Push) e.POST("/api/push", api.PushHandler) e.GET("/api/details", api.DetailsHandler) e.GET("/api/topology", api.Topology) diff --git a/pkg/api.go b/pkg/api.go index 133a4d85f..55c889f75 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -240,6 +240,7 @@ func FromResult(result CheckResult) CheckStatus { Check: &result.Check, } } + func FromV1(canary v1.Canary, check external.Check, statuses ...CheckStatus) Check { canaryID, _ := uuid.Parse(canary.GetPersistedID()) c := Check{ diff --git a/pkg/api/agent.go b/pkg/api/agent.go deleted file mode 100644 index 040a849a3..000000000 --- a/pkg/api/agent.go +++ /dev/null @@ -1,42 +0,0 @@ -package api - -import ( - "encoding/json" - "net/http" - - v1 "github.com/flanksource/canary-checker/api/v1" - "github.com/flanksource/canary-checker/pkg/db" - "github.com/labstack/echo/v4" -) - -// Pull returns all canaries for the requested agent -func Pull(c echo.Context) error { - agentName := c.Param("agent_name") - - canaries, err := db.GetCanariesOfAgent(c.Request().Context(), agentName) - if err != nil { - return errorResonse(c, err, http.StatusInternalServerError) - } - - return c.JSON(http.StatusOK, canaries) -} - -// Push stores all the check statuses sent by the agent -func Push(c echo.Context) error { - agentName := c.Param("agent_name") - - agent, err := db.FindAgent(c.Request().Context(), agentName) - if err != nil { - return errorResonse(c, err, http.StatusInternalServerError) - } else if agent == nil { - return errorResonse(c, err, http.StatusNotFound) - } - - var req v1.PushData - if err := json.NewDecoder(c.Request().Body).Decode(&req); err != nil { - return errorResonse(c, err, http.StatusBadRequest) - } - req.SetAgentID(agent.ID) - - return db.InsertAgentCheckResults(c.Request().Context(), req) -} diff --git a/pkg/cache/postgres.go b/pkg/cache/postgres.go index 3e1d9c0cb..b893e73ae 100644 --- a/pkg/cache/postgres.go +++ b/pkg/cache/postgres.go @@ -27,23 +27,41 @@ func NewPostgresCache(pool *pgxpool.Pool) *postgresCache { } } -func (c *postgresCache) Add(check pkg.Check, statii ...pkg.CheckStatus) []string { - var checkIDs []string +func (c *postgresCache) Add(check pkg.Check, statii ...pkg.CheckStatus) ([]string, []models.CheckStatus) { + var ( + checkIDs = make([]string, 0, len(statii)) + checkStatuses = make([]models.CheckStatus, 0, len(statii)) + ) + for _, status := range statii { if status.Status { check.Status = "healthy" } else { check.Status = "unhealthy" } + checkID, err := c.AddCheckFromStatus(check, status) if err != nil { logger.Errorf("error persisting check with canary %s: %v", check.CanaryID, err) } else { checkIDs = append(checkIDs, checkID.String()) } - c.AddCheckStatus(check, status) + + s := &models.CheckStatus{ + Status: status.Status, + Invalid: status.Invalid, + Time: status.Time, + Duration: int(status.Duration), + Message: status.Message, + Error: status.Error, + Detail: status.Detail, + } + + c.AddCheckStatus(check, s) + checkStatuses = append(checkStatuses, *s) } - return checkIDs + + return checkIDs, checkStatuses } func (c *postgresCache) AddCheckFromStatus(check pkg.Check, status pkg.CheckStatus) (uuid.UUID, error) { @@ -58,11 +76,12 @@ func (c *postgresCache) AddCheckFromStatus(check pkg.Check, status pkg.CheckStat return db.PersistCheck(check, check.CanaryID) } -func (c *postgresCache) AddCheckStatus(check pkg.Check, status pkg.CheckStatus) { +func (c *postgresCache) AddCheckStatus(check pkg.Check, status *models.CheckStatus) { jsonDetails, err := json.Marshal(status.Detail) if err != nil { logger.Errorf("error marshalling details: %v", err) } + checks := pkg.Checks{} if db.Gorm.Model(&checks). Clauses(clause.Returning{Columns: []clause.Column{{Name: "id"}}}). @@ -71,10 +90,13 @@ func (c *postgresCache) AddCheckStatus(check pkg.Check, status pkg.CheckStatus) logger.Errorf("error updating check: %v", err) return } + if len(checks) == 0 || checks[0].ID == uuid.Nil { logger.Debugf("check not found") return } + status.CheckID = checks[0].ID + _, err = c.Exec(context.TODO(), `INSERT INTO check_statuses( check_id, details, @@ -98,6 +120,7 @@ func (c *postgresCache) AddCheckStatus(check pkg.Check, status pkg.CheckStatus) status.Status, status.Time, ) + if err != nil { logger.Errorf("error adding check status to postgres: %v", err) } diff --git a/pkg/db/canary.go b/pkg/db/canary.go index e1068be30..9b45bb3f5 100644 --- a/pkg/db/canary.go +++ b/pkg/db/canary.go @@ -22,29 +22,6 @@ import ( "gorm.io/gorm/clause" ) -func GetCanariesOfAgent(ctx context.Context, agentName string) ([]models.Canary, error) { - var canaries []models.Canary - err := Gorm.WithContext(ctx).Where("deleted_at IS NULL").Joins("LEFT JOIN agents ON canaries.agent_id = agents.id").Where("agents.name = ?", agentName).Find(&canaries).Error - return canaries, err -} - -func InsertAgentCheckResults(ctx context.Context, req v1.PushData) error { - if len(req.Checks) > 0 { - if err := Gorm.Clauses(clause.OnConflict{UpdateAll: true}).CreateInBatches(req.Checks, 500).Error; err != nil { - return fmt.Errorf("error upserting checks: %w", err) - } - } - - if len(req.CheckStatuses) > 0 { - cols := []clause.Column{{Name: "check_id"}, {Name: "time"}} - if err := Gorm.Clauses(clause.OnConflict{UpdateAll: true, Columns: cols}).CreateInBatches(req.CheckStatuses, 500).Error; err != nil { - return fmt.Errorf("error upserting check_statuses: %w", err) - } - } - - return nil -} - func GetAllCanariesForSync() ([]pkg.Canary, error) { var _canaries []pkg.Canary var rawCanaries interface{} diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index d19901a84..25efc1278 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -1,8 +1,10 @@ package canary import ( + "bytes" "encoding/json" "fmt" + "io" "net/http" "net/url" "path" @@ -18,6 +20,7 @@ import ( "github.com/flanksource/canary-checker/pkg/metrics" "github.com/flanksource/canary-checker/pkg/push" "github.com/flanksource/canary-checker/pkg/utils" + "github.com/flanksource/commons/collections" "github.com/flanksource/commons/logger" "github.com/flanksource/duty/models" dutyTypes "github.com/flanksource/duty/types" @@ -125,16 +128,33 @@ func (job CanaryJob) Run() { return } + pushData := PushData{ + CheckStatuses: make([]models.CheckStatus, 0, len(results)), + } + for _, result := range results { if job.LogPass && result.Pass || job.LogFail && !result.Pass { logger.Infof(result.String()) } - transformedChecksAdded := cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.FromResult(*result)) + + transformedChecksAdded, checkStatus := cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.FromResult(*result)) + + if UpstreamConf.Valid() { + pushData.CheckStatuses = append(pushData.CheckStatuses, checkStatus...) + } + transformedChecksCreated = append(transformedChecksCreated, transformedChecksAdded...) for _, checkID := range transformedChecksAdded { checkIDDeleteStrategyMap[checkID] = result.Check.GetTransformDeleteStrategy() } } + + if UpstreamConf.Valid() && !pushData.Empty() { + if err := pushToUpstream(pushData); err != nil { + logger.Errorf("failed to push check results to upstream: %v", err) + } + } + job.updateStatusAndEvent(results) checkDeleteStrategyGroup := make(map[string][]string) @@ -478,3 +498,47 @@ func pull(config UpstreamConfig) error { UpdateAll: true, }).Create(&canaries).Error } + +type PushData struct { + AgentName string `json:"agent_name,omitempty"` + Checks []models.Check `json:"checks,omitempty"` + CheckStatuses []models.CheckStatus `json:"check_statuses,omitempty"` +} + +func (t *PushData) Empty() bool { + return len(t.Checks) == 0 && len(t.CheckStatuses) == 0 +} + +func pushToUpstream(data PushData) error { + data.AgentName = UpstreamConf.AgentName + payloadBuf := new(bytes.Buffer) + if err := json.NewEncoder(payloadBuf).Encode(data); err != nil { + return fmt.Errorf("error encoding msg: %w", err) + } + + endpoint, err := url.JoinPath(UpstreamConf.Host, "upstream", "push") + if err != nil { + return fmt.Errorf("error creating url endpoint for host %s: %w", UpstreamConf.Host, err) + } + + req, err := http.NewRequest(http.MethodPost, endpoint, payloadBuf) + if err != nil { + return fmt.Errorf("http.NewRequest: %w", err) + } + + req.SetBasicAuth(UpstreamConf.Username, UpstreamConf.Password) + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("error making request: %w", err) + } + defer resp.Body.Close() + + if !collections.Contains([]int{http.StatusOK, http.StatusCreated}, resp.StatusCode) { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("upstream server returned error status[%d]: %s", resp.StatusCode, string(respBody)) + } + + return nil +} From a2d0928ac10eae9ff76146a86d6f8a94bd2c0871 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 21 Jul 2023 11:54:43 +0545 Subject: [PATCH 04/10] fix: add basic auth when pulling --- pkg/db/agent.go | 23 ------- pkg/jobs/canary/canary_jobs.go | 80 ------------------------ pkg/jobs/canary/sync_upstream.go | 101 +++++++++++++++++++++++++++++++ 3 files changed, 101 insertions(+), 103 deletions(-) delete mode 100644 pkg/db/agent.go create mode 100644 pkg/jobs/canary/sync_upstream.go diff --git a/pkg/db/agent.go b/pkg/db/agent.go deleted file mode 100644 index 3b852bc81..000000000 --- a/pkg/db/agent.go +++ /dev/null @@ -1,23 +0,0 @@ -package db - -import ( - "context" - "errors" - - "github.com/flanksource/duty/models" - "gorm.io/gorm" -) - -func FindAgent(ctx context.Context, name string) (*models.Agent, error) { - var agent models.Agent - err := Gorm.WithContext(ctx).Where("name = ?", name).First(&agent).Error - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, nil - } - - return nil, err - } - - return &agent, nil -} diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 25efc1278..3fa537a65 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -1,12 +1,7 @@ package canary import ( - "bytes" - "encoding/json" "fmt" - "io" - "net/http" - "net/url" "path" "sync" "time" @@ -20,14 +15,12 @@ import ( "github.com/flanksource/canary-checker/pkg/metrics" "github.com/flanksource/canary-checker/pkg/push" "github.com/flanksource/canary-checker/pkg/utils" - "github.com/flanksource/commons/collections" "github.com/flanksource/commons/logger" "github.com/flanksource/duty/models" dutyTypes "github.com/flanksource/duty/types" "github.com/flanksource/kommons" "github.com/google/go-cmp/cmp" "github.com/robfig/cron/v3" - "gorm.io/gorm/clause" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -469,76 +462,3 @@ func init() { // We are adding a small buffer to prevent blocking CanaryStatusChannel = make(chan CanaryStatusPayload, 64) } - -func Pull() { - if err := pull(UpstreamConf); err != nil { - logger.Errorf("Error pulling upstream: %v", err) - } -} - -func pull(config UpstreamConfig) error { - endpoint, err := url.JoinPath(UpstreamConf.Host, "api", "pull", config.AgentName) - if err != nil { - return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) - } - - resp, err := http.Get(endpoint) - if err != nil { - return err - } - defer resp.Body.Close() - - var canaries []models.Canary - if err := json.NewDecoder(resp.Body).Decode(&canaries); err != nil { - return fmt.Errorf("error decoding response: %w", err) - } - - return db.Gorm.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "id"}}, - UpdateAll: true, - }).Create(&canaries).Error -} - -type PushData struct { - AgentName string `json:"agent_name,omitempty"` - Checks []models.Check `json:"checks,omitempty"` - CheckStatuses []models.CheckStatus `json:"check_statuses,omitempty"` -} - -func (t *PushData) Empty() bool { - return len(t.Checks) == 0 && len(t.CheckStatuses) == 0 -} - -func pushToUpstream(data PushData) error { - data.AgentName = UpstreamConf.AgentName - payloadBuf := new(bytes.Buffer) - if err := json.NewEncoder(payloadBuf).Encode(data); err != nil { - return fmt.Errorf("error encoding msg: %w", err) - } - - endpoint, err := url.JoinPath(UpstreamConf.Host, "upstream", "push") - if err != nil { - return fmt.Errorf("error creating url endpoint for host %s: %w", UpstreamConf.Host, err) - } - - req, err := http.NewRequest(http.MethodPost, endpoint, payloadBuf) - if err != nil { - return fmt.Errorf("http.NewRequest: %w", err) - } - - req.SetBasicAuth(UpstreamConf.Username, UpstreamConf.Password) - - httpClient := &http.Client{} - resp, err := httpClient.Do(req) - if err != nil { - return fmt.Errorf("error making request: %w", err) - } - defer resp.Body.Close() - - if !collections.Contains([]int{http.StatusOK, http.StatusCreated}, resp.StatusCode) { - respBody, _ := io.ReadAll(resp.Body) - return fmt.Errorf("upstream server returned error status[%d]: %s", resp.StatusCode, string(respBody)) - } - - return nil -} diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go new file mode 100644 index 000000000..c59d2e9a5 --- /dev/null +++ b/pkg/jobs/canary/sync_upstream.go @@ -0,0 +1,101 @@ +package canary + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + + "github.com/flanksource/canary-checker/pkg/db" + "github.com/flanksource/commons/collections" + "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/models" + "gorm.io/gorm/clause" +) + +func Pull() { + if err := pull(UpstreamConf); err != nil { + logger.Errorf("Error pulling upstream: %v", err) + } +} + +func pull(config UpstreamConfig) error { + endpoint, err := url.JoinPath(UpstreamConf.Host, "upstream", "canary", "pull", config.AgentName) + if err != nil { + return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) + } + + req, err := http.NewRequest(http.MethodGet, endpoint, nil) + if err != nil { + return err + } + + req.SetBasicAuth(UpstreamConf.Username, UpstreamConf.Password) + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("error making request: %w", err) + } + defer resp.Body.Close() + + var canaries []models.Canary + if err := json.NewDecoder(resp.Body).Decode(&canaries); err != nil { + return fmt.Errorf("error decoding response: %w", err) + } + + if len(canaries) == 0 { + return nil + } + + return db.Gorm.Omit("agent_id").Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + UpdateAll: true, + }).Create(&canaries).Error +} + +type PushData struct { + AgentName string `json:"agent_name,omitempty"` + Checks []models.Check `json:"checks,omitempty"` + CheckStatuses []models.CheckStatus `json:"check_statuses,omitempty"` +} + +func (t *PushData) Empty() bool { + return len(t.Checks) == 0 && len(t.CheckStatuses) == 0 +} + +func pushToUpstream(data PushData) error { + data.AgentName = UpstreamConf.AgentName + payloadBuf := new(bytes.Buffer) + if err := json.NewEncoder(payloadBuf).Encode(data); err != nil { + return fmt.Errorf("error encoding msg: %w", err) + } + + endpoint, err := url.JoinPath(UpstreamConf.Host, "upstream", "push") + if err != nil { + return fmt.Errorf("error creating url endpoint for host %s: %w", UpstreamConf.Host, err) + } + + req, err := http.NewRequest(http.MethodPost, endpoint, payloadBuf) + if err != nil { + return fmt.Errorf("http.NewRequest: %w", err) + } + + req.SetBasicAuth(UpstreamConf.Username, UpstreamConf.Password) + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("error making request: %w", err) + } + defer resp.Body.Close() + + if !collections.Contains([]int{http.StatusOK, http.StatusCreated}, resp.StatusCode) { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("upstream server returned error status[%d]: %s", resp.StatusCode, string(respBody)) + } + + return nil +} From 8c9a0807478e945f5a4866099d5dfb965e7c979e Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 25 Jul 2023 17:35:34 +0545 Subject: [PATCH 05/10] chore: use reconcile from duty [skip ci] --- go.mod | 2 + pkg/cache/postgres.go | 27 ++-------- pkg/jobs/canary/canary_jobs.go | 32 +----------- pkg/jobs/canary/sync_upstream.go | 90 ++++++++++++++------------------ pkg/jobs/jobs.go | 9 +++- 5 files changed, 56 insertions(+), 104 deletions(-) diff --git a/go.mod b/go.mod index 50df4eacd..9f7c4ef0b 100644 --- a/go.mod +++ b/go.mod @@ -277,3 +277,5 @@ require ( sigs.k8s.io/kustomize/kyaml v0.14.2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect ) + +replace "github.com/flanksource/duty" => ../duty \ No newline at end of file diff --git a/pkg/cache/postgres.go b/pkg/cache/postgres.go index b893e73ae..5560fa308 100644 --- a/pkg/cache/postgres.go +++ b/pkg/cache/postgres.go @@ -27,11 +27,8 @@ func NewPostgresCache(pool *pgxpool.Pool) *postgresCache { } } -func (c *postgresCache) Add(check pkg.Check, statii ...pkg.CheckStatus) ([]string, []models.CheckStatus) { - var ( - checkIDs = make([]string, 0, len(statii)) - checkStatuses = make([]models.CheckStatus, 0, len(statii)) - ) +func (c *postgresCache) Add(check pkg.Check, statii ...pkg.CheckStatus) []string { + var checkIDs = make([]string, 0, len(statii)) for _, status := range statii { if status.Status { @@ -46,22 +43,10 @@ func (c *postgresCache) Add(check pkg.Check, statii ...pkg.CheckStatus) ([]strin } else { checkIDs = append(checkIDs, checkID.String()) } - - s := &models.CheckStatus{ - Status: status.Status, - Invalid: status.Invalid, - Time: status.Time, - Duration: int(status.Duration), - Message: status.Message, - Error: status.Error, - Detail: status.Detail, - } - - c.AddCheckStatus(check, s) - checkStatuses = append(checkStatuses, *s) + c.AddCheckStatus(check, status) } - return checkIDs, checkStatuses + return checkIDs } func (c *postgresCache) AddCheckFromStatus(check pkg.Check, status pkg.CheckStatus) (uuid.UUID, error) { @@ -76,7 +61,7 @@ func (c *postgresCache) AddCheckFromStatus(check pkg.Check, status pkg.CheckStat return db.PersistCheck(check, check.CanaryID) } -func (c *postgresCache) AddCheckStatus(check pkg.Check, status *models.CheckStatus) { +func (c *postgresCache) AddCheckStatus(check pkg.Check, status pkg.CheckStatus) { jsonDetails, err := json.Marshal(status.Detail) if err != nil { logger.Errorf("error marshalling details: %v", err) @@ -95,8 +80,6 @@ func (c *postgresCache) AddCheckStatus(check pkg.Check, status *models.CheckStat logger.Debugf("check not found") return } - status.CheckID = checks[0].ID - _, err = c.Exec(context.TODO(), `INSERT INTO check_statuses( check_id, details, diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 3fa537a65..5dc1b4e4b 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -32,19 +32,6 @@ var DataFile string var Executor bool var LogPass, LogFail bool -type UpstreamConfig struct { - AgentName string - Host string - Username string - Password string -} - -func (t *UpstreamConfig) Valid() bool { - return t.Host != "" && t.Username != "" && t.Password != "" && t.AgentName != "" -} - -var UpstreamConf UpstreamConfig - var Kommons *kommons.Client var Kubernetes kubernetes.Interface var FuncScheduler = cron.New() @@ -121,33 +108,16 @@ func (job CanaryJob) Run() { return } - pushData := PushData{ - CheckStatuses: make([]models.CheckStatus, 0, len(results)), - } - for _, result := range results { if job.LogPass && result.Pass || job.LogFail && !result.Pass { logger.Infof(result.String()) } - - transformedChecksAdded, checkStatus := cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.FromResult(*result)) - - if UpstreamConf.Valid() { - pushData.CheckStatuses = append(pushData.CheckStatuses, checkStatus...) - } - + transformedChecksAdded := cache.PostgresCache.Add(pkg.FromV1(result.Canary, result.Check), pkg.FromResult(*result)) transformedChecksCreated = append(transformedChecksCreated, transformedChecksAdded...) for _, checkID := range transformedChecksAdded { checkIDDeleteStrategyMap[checkID] = result.Check.GetTransformDeleteStrategy() } } - - if UpstreamConf.Valid() && !pushData.Empty() { - if err := pushToUpstream(pushData); err != nil { - logger.Errorf("failed to push check results to upstream: %v", err) - } - } - job.updateStatusAndEvent(results) checkDeleteStrategyGroup := make(map[string][]string) diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index c59d2e9a5..fa381a1f0 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -1,28 +1,62 @@ package canary import ( - "bytes" "encoding/json" "fmt" - "io" "net/http" "net/url" + "github.com/flanksource/canary-checker/api/context" + v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg/db" - "github.com/flanksource/commons/collections" "github.com/flanksource/commons/logger" "github.com/flanksource/duty/models" + "github.com/flanksource/duty/upstream" "gorm.io/gorm/clause" ) +var UpstreamConf upstream.UpstreamConfig + +var tablesToReconcile = []string{ + "canaries", + "checks", +} + +// SyncWithUpstream coordinates with upstream and pushes any resource +// that are missing on the upstream. +func SyncWithUpstream() { + ctx := context.New(nil, nil, db.Gorm, v1.Canary{}) + + jobHistory := models.NewJobHistory("SyncCanaryResultsWithUpstream", "Canary", "") + _ = db.PersistJobHistory(jobHistory.Start()) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + syncer := upstream.NewUpstreamSyncer(UpstreamConf) + for _, table := range tablesToReconcile { + if err := syncer.SyncTableWithUpstream(ctx, table); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("failed to sync table %s: %w", table, err) + } else { + jobHistory.IncrSuccess() + } + } +} + func Pull() { + jobHistory := models.NewJobHistory("PullAgentCanaries", "Canary", "") + _ = db.PersistJobHistory(jobHistory.Start()) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + if err := pull(UpstreamConf); err != nil { + jobHistory.AddError(err.Error()) logger.Errorf("Error pulling upstream: %v", err) } + + jobHistory.IncrSuccess() } -func pull(config UpstreamConfig) error { - endpoint, err := url.JoinPath(UpstreamConf.Host, "upstream", "canary", "pull", config.AgentName) +func pull(config upstream.UpstreamConfig) error { + endpoint, err := url.JoinPath(config.Host, "upstream", "canary", "pull", config.AgentName) if err != nil { return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) } @@ -32,7 +66,7 @@ func pull(config UpstreamConfig) error { return err } - req.SetBasicAuth(UpstreamConf.Username, UpstreamConf.Password) + req.SetBasicAuth(config.Username, config.Password) httpClient := &http.Client{} resp, err := httpClient.Do(req) @@ -55,47 +89,3 @@ func pull(config UpstreamConfig) error { UpdateAll: true, }).Create(&canaries).Error } - -type PushData struct { - AgentName string `json:"agent_name,omitempty"` - Checks []models.Check `json:"checks,omitempty"` - CheckStatuses []models.CheckStatus `json:"check_statuses,omitempty"` -} - -func (t *PushData) Empty() bool { - return len(t.Checks) == 0 && len(t.CheckStatuses) == 0 -} - -func pushToUpstream(data PushData) error { - data.AgentName = UpstreamConf.AgentName - payloadBuf := new(bytes.Buffer) - if err := json.NewEncoder(payloadBuf).Encode(data); err != nil { - return fmt.Errorf("error encoding msg: %w", err) - } - - endpoint, err := url.JoinPath(UpstreamConf.Host, "upstream", "push") - if err != nil { - return fmt.Errorf("error creating url endpoint for host %s: %w", UpstreamConf.Host, err) - } - - req, err := http.NewRequest(http.MethodPost, endpoint, payloadBuf) - if err != nil { - return fmt.Errorf("http.NewRequest: %w", err) - } - - req.SetBasicAuth(UpstreamConf.Username, UpstreamConf.Password) - - httpClient := &http.Client{} - resp, err := httpClient.Do(req) - if err != nil { - return fmt.Errorf("error making request: %w", err) - } - defer resp.Body.Close() - - if !collections.Contains([]int{http.StatusOK, http.StatusCreated}, resp.StatusCode) { - respBody, _ := io.ReadAll(resp.Body) - return fmt.Errorf("upstream server returned error status[%d]: %s", resp.StatusCode, string(respBody)) - } - - return nil -} diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index e59b9eddf..a68d420f1 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -41,10 +41,17 @@ func Start() { if canaryJobs.UpstreamConf.Valid() { canaryJobs.Pull() + canaryJobs.SyncWithUpstream() + if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.Pull); err != nil { - logger.Errorf("Failed to pull canaries from central server: %v", err) + logger.Errorf("Failed to schedule job [canaryJobs.Pull]: %v", err) + } + + if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.SyncWithUpstream); err != nil { + logger.Errorf("Failed to schedule job [canaryJobs.SyncWithUpstream]: %v", err) } } + if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.SyncCanaryJobs); err != nil { logger.Errorf("Failed to schedule sync jobs for canary: %v", err) } From 33cea7873740af84a89c054bc6e6ed796c8ff784 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 26 Jul 2023 10:26:11 +0545 Subject: [PATCH 06/10] chore: fix typo [skip ci] --- pkg/jobs/canary/sync_upstream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index fa381a1f0..909eef6bd 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -18,8 +18,8 @@ import ( var UpstreamConf upstream.UpstreamConfig var tablesToReconcile = []string{ - "canaries", "checks", + "check_statuses", } // SyncWithUpstream coordinates with upstream and pushes any resource @@ -31,7 +31,7 @@ func SyncWithUpstream() { _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - syncer := upstream.NewUpstreamSyncer(UpstreamConf) + syncer := upstream.NewUpstreamSyncer(UpstreamConf, 500) for _, table := range tablesToReconcile { if err := syncer.SyncTableWithUpstream(ctx, table); err != nil { jobHistory.AddError(err.Error()) From 6871d93d3b05281ee25665399e66160e3b209d9d Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 2 Aug 2023 14:23:43 +0545 Subject: [PATCH 07/10] chore: bump duty --- go.mod | 2 -- pkg/jobs/canary/sync_upstream.go | 6 +++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 9f7c4ef0b..50df4eacd 100644 --- a/go.mod +++ b/go.mod @@ -277,5 +277,3 @@ require ( sigs.k8s.io/kustomize/kyaml v0.14.2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect ) - -replace "github.com/flanksource/duty" => ../duty \ No newline at end of file diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index 909eef6bd..5c0fb20ab 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -31,11 +31,11 @@ func SyncWithUpstream() { _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - syncer := upstream.NewUpstreamSyncer(UpstreamConf, 500) + reconciler := upstream.NewUpstreamReconciler(UpstreamConf, 5) for _, table := range tablesToReconcile { - if err := syncer.SyncTableWithUpstream(ctx, table); err != nil { + if err := reconciler.Sync(ctx, table); err != nil { jobHistory.AddError(err.Error()) - logger.Errorf("failed to sync table %s: %w", table, err) + logger.Errorf("failed to sync table %s: %v", table, err) } else { jobHistory.IncrSuccess() } From bd6f41a58ce8446c04e7e89cc493f403d2506233 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Thu, 27 Jul 2023 15:33:49 +0545 Subject: [PATCH 08/10] chore: cleanup --- pkg/cache/postgres.go | 2 +- pkg/jobs/canary/sync_upstream.go | 14 +++++++------- pkg/jobs/jobs.go | 8 +++++--- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/pkg/cache/postgres.go b/pkg/cache/postgres.go index 5560fa308..f49bbbfba 100644 --- a/pkg/cache/postgres.go +++ b/pkg/cache/postgres.go @@ -28,7 +28,7 @@ func NewPostgresCache(pool *pgxpool.Pool) *postgresCache { } func (c *postgresCache) Add(check pkg.Check, statii ...pkg.CheckStatus) []string { - var checkIDs = make([]string, 0, len(statii)) + checkIDs := make([]string, 0, len(statii)) for _, status := range statii { if status.Status { diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index 5c0fb20ab..cedfb26be 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -22,12 +22,12 @@ var tablesToReconcile = []string{ "check_statuses", } -// SyncWithUpstream coordinates with upstream and pushes any resource +// PushCanaryResultsToUpstream coordinates with upstream and pushes any resource // that are missing on the upstream. -func SyncWithUpstream() { +func PushCanaryResultsToUpstream() { ctx := context.New(nil, nil, db.Gorm, v1.Canary{}) - jobHistory := models.NewJobHistory("SyncCanaryResultsWithUpstream", "Canary", "") + jobHistory := models.NewJobHistory("PushCanaryResultsToUpstream", "Canary", "") _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() @@ -43,16 +43,16 @@ func SyncWithUpstream() { } func Pull() { - jobHistory := models.NewJobHistory("PullAgentCanaries", "Canary", "") + jobHistory := models.NewJobHistory("PullUpstreamCanaries", "Canary", "") _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() if err := pull(UpstreamConf); err != nil { jobHistory.AddError(err.Error()) logger.Errorf("Error pulling upstream: %v", err) + } else { + jobHistory.IncrSuccess() } - - jobHistory.IncrSuccess() } func pull(config upstream.UpstreamConfig) error { @@ -63,7 +63,7 @@ func pull(config upstream.UpstreamConfig) error { req, err := http.NewRequest(http.MethodGet, endpoint, nil) if err != nil { - return err + return fmt.Errorf("error creating new http request: %w", err) } req.SetBasicAuth(config.Username, config.Password) diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index a68d420f1..d6d0caaf1 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -14,6 +14,8 @@ import ( var FuncScheduler = cron.New() const ( + PullCanaryFromUpstreamSchedule = "@every 2m" + PushCanaryToUpstreamSchedule = "@every 5m" SyncCanaryJobsSchedule = "@every 2m" SyncSystemsJobsSchedule = "@every 5m" ComponentRunSchedule = "@every 2m" @@ -41,13 +43,13 @@ func Start() { if canaryJobs.UpstreamConf.Valid() { canaryJobs.Pull() - canaryJobs.SyncWithUpstream() + canaryJobs.PushCanaryResultsToUpstream() - if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.Pull); err != nil { + if _, err := ScheduleFunc(PullCanaryFromUpstreamSchedule, canaryJobs.Pull); err != nil { logger.Errorf("Failed to schedule job [canaryJobs.Pull]: %v", err) } - if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.SyncWithUpstream); err != nil { + if _, err := ScheduleFunc(PushCanaryToUpstreamSchedule, canaryJobs.PushCanaryResultsToUpstream); err != nil { logger.Errorf("Failed to schedule job [canaryJobs.SyncWithUpstream]: %v", err) } } From bda21d90d0b5bcb4cde6660ab4bf50155eae59d2 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Mon, 31 Jul 2023 16:18:28 +0545 Subject: [PATCH 09/10] feat: create new job that pushes the canary results to upstream more frequently --- pkg/jobs/canary/sync_upstream.go | 81 +++++++++++++++++++++++++++++--- pkg/jobs/jobs.go | 24 +++++++--- 2 files changed, 92 insertions(+), 13 deletions(-) diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index cedfb26be..b033320e6 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -1,10 +1,12 @@ package canary import ( + goctx "context" "encoding/json" "fmt" "net/http" "net/url" + "time" "github.com/flanksource/canary-checker/api/context" v1 "github.com/flanksource/canary-checker/api/v1" @@ -22,9 +24,9 @@ var tablesToReconcile = []string{ "check_statuses", } -// PushCanaryResultsToUpstream coordinates with upstream and pushes any resource +// ReconcileCanaryResults coordinates with upstream and pushes any resource // that are missing on the upstream. -func PushCanaryResultsToUpstream() { +func ReconcileCanaryResults() { ctx := context.New(nil, nil, db.Gorm, v1.Canary{}) jobHistory := models.NewJobHistory("PushCanaryResultsToUpstream", "Canary", "") @@ -42,20 +44,27 @@ func PushCanaryResultsToUpstream() { } } -func Pull() { +// UpstreamPullJob pulls canaries from the upstream +type UpstreamPullJob struct { + lastRuntime time.Time +} + +func (t *UpstreamPullJob) Run() { jobHistory := models.NewJobHistory("PullUpstreamCanaries", "Canary", "") _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - if err := pull(UpstreamConf); err != nil { + if err := t.pull(UpstreamConf); err != nil { jobHistory.AddError(err.Error()) - logger.Errorf("Error pulling upstream: %v", err) + logger.Errorf("error pulling from upstream: %v", err) } else { jobHistory.IncrSuccess() } } -func pull(config upstream.UpstreamConfig) error { +func (t *UpstreamPullJob) pull(config upstream.UpstreamConfig) error { + logger.Tracef("pulling canaries from upstream since: %v", t.lastRuntime) + endpoint, err := url.JoinPath(config.Host, "upstream", "canary", "pull", config.AgentName) if err != nil { return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) @@ -68,6 +77,10 @@ func pull(config upstream.UpstreamConfig) error { req.SetBasicAuth(config.Username, config.Password) + params := url.Values{} + params.Add("since", t.lastRuntime.Format(time.RFC3339)) + req.URL.RawQuery = params.Encode() + httpClient := &http.Client{} resp, err := httpClient.Do(req) if err != nil { @@ -80,12 +93,68 @@ func pull(config upstream.UpstreamConfig) error { return fmt.Errorf("error decoding response: %w", err) } + t.lastRuntime = time.Now() + if len(canaries) == 0 { return nil } + logger.Tracef("fetched %d canaries from upstream", len(canaries)) + return db.Gorm.Omit("agent_id").Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "id"}}, UpdateAll: true, }).Create(&canaries).Error } + +type UpstreamPushJob struct { + lastRuntime time.Time + + // MaxAge defines how far back we look into the past on startup whe + // lastRuntime is zero. + MaxAge time.Duration +} + +func (t *UpstreamPushJob) Run() { + jobHistory := models.NewJobHistory("UpstreamPushJob", "Canary", "") + _ = db.PersistJobHistory(jobHistory.Start()) + defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() + + if err := t.run(); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("error pushing to upstream: %v", err) + } else { + jobHistory.IncrSuccess() + } +} + +func (t *UpstreamPushJob) run() error { + logger.Tracef("running upstream push job") + + var currentTime time.Time + if err := db.Gorm.Raw("SELECT NOW()").Scan(¤tTime).Error; err != nil { + return err + } + + if t.lastRuntime.IsZero() { + t.lastRuntime = currentTime.Add(-t.MaxAge) + } + + pushData := &upstream.PushData{AgentName: UpstreamConf.AgentName} + if err := db.Gorm.Where("created_at > ?", t.lastRuntime).Find(&pushData.CheckStatuses).Error; err != nil { + return err + } + + if err := db.Gorm.Where("updated_at > ?", t.lastRuntime).Find(&pushData.Checks).Error; err != nil { + return err + } + + t.lastRuntime = currentTime + + if pushData.Count() == 0 { + return nil + } + logger.Tracef("pushing %d canary results to upstream", pushData.Count()) + + return upstream.Push(goctx.Background(), UpstreamConf, pushData) +} diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index d6d0caaf1..7b18e99a8 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -1,6 +1,8 @@ package jobs import ( + "time" + "github.com/flanksource/canary-checker/pkg/db" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" systemJobs "github.com/flanksource/canary-checker/pkg/jobs/system" @@ -15,7 +17,8 @@ var FuncScheduler = cron.New() const ( PullCanaryFromUpstreamSchedule = "@every 2m" - PushCanaryToUpstreamSchedule = "@every 5m" + PushCanaryToUpstreamSchedule = "@every 10s" + ReconcileCanaryToUpstreamSchedule = "@every 3h" SyncCanaryJobsSchedule = "@every 2m" SyncSystemsJobsSchedule = "@every 5m" ComponentRunSchedule = "@every 2m" @@ -42,15 +45,22 @@ func Start() { FuncScheduler.Start() if canaryJobs.UpstreamConf.Valid() { - canaryJobs.Pull() - canaryJobs.PushCanaryResultsToUpstream() + pushJob := &canaryJobs.UpstreamPushJob{MaxAge: time.Minute * 5} + pushJob.Run() + + pullJob := &canaryJobs.UpstreamPullJob{} + pullJob.Run() + + if _, err := FuncScheduler.AddJob(PullCanaryFromUpstreamSchedule, pullJob); err != nil { + logger.Fatalf("Failed to schedule job [canaryJobs.Pull]: %v", err) + } - if _, err := ScheduleFunc(PullCanaryFromUpstreamSchedule, canaryJobs.Pull); err != nil { - logger.Errorf("Failed to schedule job [canaryJobs.Pull]: %v", err) + if _, err := FuncScheduler.AddJob(PushCanaryToUpstreamSchedule, pushJob); err != nil { + logger.Fatalf("Failed to schedule job [canaryJobs.UpstreamPushJob]: %v", err) } - if _, err := ScheduleFunc(PushCanaryToUpstreamSchedule, canaryJobs.PushCanaryResultsToUpstream); err != nil { - logger.Errorf("Failed to schedule job [canaryJobs.SyncWithUpstream]: %v", err) + if _, err := ScheduleFunc(ReconcileCanaryToUpstreamSchedule, canaryJobs.ReconcileCanaryResults); err != nil { + logger.Fatalf("Failed to schedule job [canaryJobs.SyncWithUpstream]: %v", err) } } From 5c72f6e3d5c660c9ffbb77e8f14d98dbf3b5be37 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 2 Aug 2023 14:21:46 +0545 Subject: [PATCH 10/10] use the new canary endpoint response from incident commander --- pkg/jobs/canary/sync_upstream.go | 17 +++++++++++------ pkg/jobs/jobs.go | 2 +- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/jobs/canary/sync_upstream.go b/pkg/jobs/canary/sync_upstream.go index b033320e6..fda30d893 100644 --- a/pkg/jobs/canary/sync_upstream.go +++ b/pkg/jobs/canary/sync_upstream.go @@ -44,6 +44,11 @@ func ReconcileCanaryResults() { } } +type CanaryPullResponse struct { + Before time.Time `json:"before"` + Canaries []models.Canary `json:"canaries,omitempty"` +} + // UpstreamPullJob pulls canaries from the upstream type UpstreamPullJob struct { lastRuntime time.Time @@ -88,23 +93,23 @@ func (t *UpstreamPullJob) pull(config upstream.UpstreamConfig) error { } defer resp.Body.Close() - var canaries []models.Canary - if err := json.NewDecoder(resp.Body).Decode(&canaries); err != nil { + var response CanaryPullResponse + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { return fmt.Errorf("error decoding response: %w", err) } - t.lastRuntime = time.Now() + t.lastRuntime = response.Before - if len(canaries) == 0 { + if len(response.Canaries) == 0 { return nil } - logger.Tracef("fetched %d canaries from upstream", len(canaries)) + logger.Tracef("fetched %d canaries from upstream", len(response.Canaries)) return db.Gorm.Omit("agent_id").Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "id"}}, UpdateAll: true, - }).Create(&canaries).Error + }).Create(&response.Canaries).Error } type UpstreamPushJob struct { diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 7b18e99a8..03dac4a01 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -16,7 +16,7 @@ import ( var FuncScheduler = cron.New() const ( - PullCanaryFromUpstreamSchedule = "@every 2m" + PullCanaryFromUpstreamSchedule = "@every 30s" PushCanaryToUpstreamSchedule = "@every 10s" ReconcileCanaryToUpstreamSchedule = "@every 3h" SyncCanaryJobsSchedule = "@every 2m"