Skip to content

Commit

Permalink
draft: agent sync
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Jul 18, 2023
1 parent 14a043d commit 489b480
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 0 deletions.
7 changes: 7 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/flanksource/canary-checker/pkg/cache"
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/canary-checker/pkg/jobs/canary"
"github.com/flanksource/canary-checker/pkg/runner"
"github.com/flanksource/canary-checker/templating"
"github.com/flanksource/commons/logger"
Expand Down Expand Up @@ -56,6 +57,12 @@ func ServerFlags(flags *pflag.FlagSet) {
flags.IntVar(&db.CheckStatusRetentionDays, "check-status-retention-period", db.DefaultCheckStatusRetentionDays, "Check status retention period in days")
flags.IntVar(&db.CheckRetentionDays, "check-retention-period", db.DefaultCheckRetentionDays, "Check retention period in days")
flags.IntVar(&db.CanaryRetentionDays, "canary-retention-period", db.DefaultCanaryRetentionDays, "Canary retention period in days")

// Flags for push/pull
flags.StringVar(&canary.UpstreamConf.Host, "upstream-host", "", "central canary checker instance to push/pull canaries")
flags.StringVar(&canary.UpstreamConf.Username, "upstream-user", "", "upstream username")
flags.StringVar(&canary.UpstreamConf.Password, "upstream-password", "", "upstream password")
flags.StringVar(&canary.UpstreamConf.AgentName, "agent-name", "", "name of this agent")
}

func readFromEnv(v string) string {
Expand Down
1 change: 1 addition & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func serve() {
e.GET("/api/summary", api.HealthSummary)
e.GET("/about", api.About)
e.GET("/api/graph", api.CheckDetails)
e.GET("/api/pull/:agent_name", api.Pull)
e.POST("/api/push", api.PushHandler)
e.GET("/api/details", api.DetailsHandler)
e.GET("/api/topology", api.Topology)
Expand Down
21 changes: 21 additions & 0 deletions pkg/api/pull.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package api

import (
"net/http"

"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/duty/models"
"github.com/labstack/echo/v4"
)

// Pull returns all canaries for the requested agent
func Pull(c echo.Context) error {
agentName := c.Param("agent_name")

var canaries []models.Canary
if err := db.Gorm.Where("deleted_at IS NULL").Joins("LEFT JOIN agents ON canaries.agent_id = agents.id").Where("agents.name = ?", agentName).Find(&canaries).Error; err != nil {
return errorResonse(c, err, http.StatusInternalServerError)
}

return c.JSON(http.StatusOK, canaries)
}
42 changes: 42 additions & 0 deletions pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package canary

import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"path"
"sync"
"time"
Expand All @@ -19,6 +22,7 @@ import (
"github.com/flanksource/duty/models"
"github.com/flanksource/kommons"
"github.com/robfig/cron/v3"
"gorm.io/gorm/clause"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
Expand All @@ -30,6 +34,15 @@ var DataFile string
var Executor bool
var LogPass, LogFail bool

type UpstreamConfig struct {
AgentName string
Host string
Username string
Password string
}

var UpstreamConf UpstreamConfig

var Kommons *kommons.Client
var Kubernetes kubernetes.Interface
var FuncScheduler = cron.New()
Expand Down Expand Up @@ -405,3 +418,32 @@ func init() {
// We are adding a small buffer to prevent blocking
CanaryStatusChannel = make(chan CanaryStatusPayload, 64)
}

func Pull() {
if err := pull(UpstreamConf); err != nil {
logger.Errorf("Error pulling upstream: %v", err)
}
}

func pull(config UpstreamConfig) error {
endpoint, err := url.JoinPath(UpstreamConf.Host, "api", "pull")
if err != nil {
return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err)
}

resp, err := http.Get(endpoint)
if err != nil {
return err
}
defer resp.Body.Close()

var canaries []models.Canary
if err := json.NewDecoder(resp.Body).Decode(&canaries); err != nil {
return fmt.Errorf("error decoding response: %w", err)
}

return db.Gorm.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
UpdateAll: true,
}).Create(&canaries).Error
}
5 changes: 5 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func Start() {
canaryJobs.CanaryScheduler.Start()
FuncScheduler.Start()

if canaryJobs.UpstreamConf.AgentName != "" {
if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.Pull); err != nil {
logger.Errorf("Failed to pull canaries from central server: %v", err)
}
}
if _, err := ScheduleFunc(SyncCanaryJobsSchedule, canaryJobs.SyncCanaryJobs); err != nil {
logger.Errorf("Failed to schedule sync jobs for canary: %v", err)
}
Expand Down

0 comments on commit 489b480

Please sign in to comment.