diff --git a/cmd/operator.go b/cmd/operator.go index 0a9dc0b53..7880bdfa7 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -4,12 +4,15 @@ import ( "fmt" "os" "strings" + "time" "github.com/flanksource/canary-checker/pkg/cache" "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/jobs" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" "github.com/flanksource/canary-checker/pkg/runner" + "github.com/flanksource/canary-checker/pkg/utils" + gocache "github.com/patrickmn/go-cache" canaryv1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg" @@ -21,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" + ctrlCache "sigs.k8s.io/controller-runtime/pkg/cache" ctrlzap "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -89,6 +93,9 @@ func run(cmd *cobra.Command, args []string) { LeaderElection: enableLeaderElection, LeaderElectionNamespace: operatorNamespace, LeaderElectionID: "bc88107d.flanksource.com", + Cache: ctrlCache.Options{ + SyncPeriod: utils.Ptr(1 * time.Hour), + }, }) if err != nil { setupLog.Error(err, "unable to start manager") @@ -115,6 +122,7 @@ func run(cmd *cobra.Command, args []string) { Log: ctrl.Log.WithName("controllers").WithName("canary"), Scheme: mgr.GetScheme(), RunnerName: runner.RunnerName, + CanaryCache: gocache.New(7*24*time.Hour, 1*time.Hour), } systemReconciler := &controllers.TopologyReconciler{ diff --git a/go.mod b/go.mod index 9d7a5f384..061d9d80b 100644 --- a/go.mod +++ b/go.mod @@ -201,6 +201,7 @@ require ( github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/montanaflynn/stats v0.7.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect diff --git a/go.sum b/go.sum index 214a38288..8d6b1fc29 100644 --- a/go.sum +++ b/go.sum @@ -1312,6 +1312,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1 h1:dOYG7LS/WK00RWZc8XGgcUTlTxpp3mKhdR2Q9z9HbXM= +github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1/go.mod h1:mpRZBD8SJ55OIICQ3iWH0Yz3cjzA61JdqMLoWXeB2+8= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/pkg/api.go b/pkg/api.go index 55c889f75..02dcaddac 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -116,14 +116,14 @@ type Timeseries struct { type Canary struct { ID uuid.UUID `gorm:"default:generate_ulid()"` AgentID uuid.UUID - Spec types.JSON - Labels types.JSONStringMap + Spec types.JSON `json:"spec"` + Labels types.JSONStringMap `json:"labels"` Source string Name string Namespace string Checks types.JSONStringMap `gorm:"-"` CreatedAt time.Time - UpdatedAt time.Time + UpdatedAt time.Time `json:"updated_at"` DeletedAt *time.Time `json:"deleted_at,omitempty" time_format:"postgres_timestamp"` } diff --git a/pkg/controllers/canary_controller.go b/pkg/controllers/canary_controller.go index 3e7b1d4b6..1dd374505 100644 --- a/pkg/controllers/canary_controller.go +++ b/pkg/controllers/canary_controller.go @@ -29,6 +29,9 @@ import ( canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" "github.com/flanksource/kommons" "github.com/go-logr/logr" + jsontime "github.com/liamylian/jsontime/v2/v2" + "github.com/nsf/jsondiff" + "github.com/patrickmn/go-cache" "github.com/robfig/cron/v3" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -39,20 +42,23 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) +var json = jsontime.ConfigWithCustomTimeFormat + // CanaryReconciler reconciles a Canary object type CanaryReconciler struct { IncludeCheck string IncludeNamespaces []string LogPass, LogFail bool client.Client - Kubernetes kubernetes.Interface - Kommons *kommons.Client - Log logr.Logger - Scheme *runtime.Scheme - Events record.EventRecorder - Cron *cron.Cron - RunnerName string - Done chan *pkg.CheckResult + Kubernetes kubernetes.Interface + Kommons *kommons.Client + Log logr.Logger + Scheme *runtime.Scheme + Events record.EventRecorder + Cron *cron.Cron + RunnerName string + Done chan *pkg.CheckResult + CanaryCache *cache.Cache } const FinalizerName = "canary.canaries.flanksource.com" @@ -95,13 +101,13 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c return ctrl.Result{}, r.Update(ctx, canary) } - dbCanary, checks, changed, err := db.PersistCanary(*canary, "kubernetes/"+canary.GetPersistedID()) + dbCanary, err := r.updateCanaryInDB(canary) if err != nil { return ctrl.Result{Requeue: true}, err } // Sync jobs if canary is created or updated - if canary.Generation == 1 || changed { + if canary.Generation == 1 { if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil { logger.Error(err, "failed to sync canary job") return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err @@ -117,7 +123,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c } patch := client.MergeFrom(canaryForStatus.DeepCopy()) - canaryForStatus.Status.Checks = checks + canaryForStatus.Status.Checks = dbCanary.Checks canaryForStatus.Status.ObservedGeneration = canary.Generation if err = r.Status().Patch(ctx, &canaryForStatus, patch); err != nil { logger.Error(err, "failed to update status for canary") @@ -133,6 +139,50 @@ func (r *CanaryReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +func (r *CanaryReconciler) persistAndCacheCanary(canary *v1.Canary) (*pkg.Canary, error) { + dbCanary, err := db.PersistCanary(*canary, "kubernetes/"+canary.GetPersistedID()) + if err != nil { + return nil, err + } + r.CanaryCache.Set(dbCanary.ID.String(), dbCanary, cache.DefaultExpiration) + + if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil { + return nil, err + } + return dbCanary, nil +} + +func (r *CanaryReconciler) updateCanaryInDB(canary *v1.Canary) (*pkg.Canary, error) { + var dbCanary *pkg.Canary + var err error + + // Get DBCanary from cache if exists else persist in database and update cache + if cacheObj, exists := r.CanaryCache.Get(canary.GetPersistedID()); !exists { + dbCanary, err = r.persistAndCacheCanary(canary) + if err != nil { + return nil, err + } + } else { + dbCanary = cacheObj.(*pkg.Canary) + } + + // Compare canary spec and spec in database + // If they do not match, persist the canary in database + canarySpecJSON, err := json.Marshal(canary.Spec) + if err != nil { + return nil, err + } + opts := jsondiff.DefaultJSONOptions() + if diff, _ := jsondiff.Compare(canarySpecJSON, dbCanary.Spec, &opts); diff != jsondiff.FullMatch { + dbCanary, err = r.persistAndCacheCanary(canary) + if err != nil { + return nil, err + } + } + + return dbCanary, nil +} + func (r *CanaryReconciler) Report() { for payload := range canaryJobs.CanaryStatusChannel { var canary v1.Canary diff --git a/pkg/db/canary.go b/pkg/db/canary.go index 490cc094c..b0014b253 100644 --- a/pkg/db/canary.go +++ b/pkg/db/canary.go @@ -23,8 +23,6 @@ import ( ) func GetAllCanariesForSync() ([]pkg.Canary, error) { - var _canaries []pkg.Canary - var rawCanaries interface{} query := ` SELECT json_agg( jsonb_set_lax(to_jsonb(canaries),'{checks}', ( @@ -43,24 +41,20 @@ func GetAllCanariesForSync() ([]pkg.Canary, error) { agent_id = '00000000-0000-0000-0000-000000000000' ` - rows, err := Gorm.Raw(query).Rows() + rows, err := Pool.Query(context.Background(), query) if err != nil { return nil, err } - if err := rows.Err(); err != nil { - return nil, err - } - defer rows.Close() + + var _canaries []pkg.Canary for rows.Next() { - if err := rows.Scan(&rawCanaries); err != nil { - return nil, err + if rows.RawValues()[0] == nil { + continue + } + + if err := json.Unmarshal(rows.RawValues()[0], &_canaries); err != nil { + return nil, fmt.Errorf("failed to unmarshal canaries:%w for %s", err, rows.RawValues()[0]) } - } - if rawCanaries == nil { - return nil, nil - } - if err := json.Unmarshal(rawCanaries.([]byte), &_canaries); err != nil { - return nil, err } return _canaries, nil } @@ -294,22 +288,20 @@ func CreateCheck(canary pkg.Canary, check *pkg.Check) error { return Gorm.Create(&check).Error } -func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, map[string]string, bool, error) { - var err error - changed := false - tx := Gorm.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "agent_id"}, {Name: "name"}, {Name: "namespace"}, {Name: "source"}}, - DoUpdates: clause.AssignmentColumns([]string{"labels", "spec"}), - }).Create(&model) - if tx.RowsAffected > 0 { - changed = true - } +func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, error) { + err := Gorm.Clauses( + clause.OnConflict{ + Columns: []clause.Column{{Name: "agent_id"}, {Name: "name"}, {Name: "namespace"}, {Name: "source"}}, + DoUpdates: clause.AssignmentColumns([]string{"labels", "spec"}), + }, + clause.Returning{}, + ).Create(&model).Error // Duplicate key happens when an already created canary is persisted // We will ignore this error but act on other errors if err != nil { - if !errors.Is(tx.Error, gorm.ErrDuplicatedKey) { - return nil, map[string]string{}, changed, tx.Error + if !errors.Is(err, gorm.ErrDuplicatedKey) { + return nil, err } } @@ -322,12 +314,12 @@ func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, map[string]string, bool, Error if err != nil { logger.Errorf("Error fetching existing checks for canary:%s", model.ID) - return nil, nil, changed, err + return nil, err } var spec v1.CanarySpec if err = json.Unmarshal(model.Spec, &spec); err != nil { - return nil, nil, changed, err + return nil, err } var checks = make(map[string]string) @@ -355,14 +347,13 @@ func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, map[string]string, bool, } model.Checks = checks - return &model, checks, changed, nil + return &model, nil } -func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]string, bool, error) { - changed := false +func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, error) { model, err := pkg.CanaryFromV1(canary) if err != nil { - return nil, nil, changed, err + return nil, err } if canary.GetPersistedID() != "" { model.ID, _ = uuid.Parse(canary.GetPersistedID()) diff --git a/pkg/db/canary_selector.go b/pkg/db/canary_selector.go index 633b4e7cf..97eeeb1d2 100644 --- a/pkg/db/canary_selector.go +++ b/pkg/db/canary_selector.go @@ -86,7 +86,7 @@ func CreateComponentCanaryFromInline(id, name, namespace, schedule, owner string }, Spec: *spec, } - canary, _, _, err := PersistCanary(obj, fmt.Sprintf("component/%s", id)) + canary, err := PersistCanary(obj, fmt.Sprintf("component/%s", id)) if err != nil { logger.Debugf("error persisting component inline canary: %v", err) return nil, err diff --git a/pkg/jobs/canary/canary_jobs.go b/pkg/jobs/canary/canary_jobs.go index 89fefaf8d..6f5056172 100644 --- a/pkg/jobs/canary/canary_jobs.go +++ b/pkg/jobs/canary/canary_jobs.go @@ -17,9 +17,7 @@ import ( "github.com/flanksource/canary-checker/pkg/utils" "github.com/flanksource/commons/logger" "github.com/flanksource/duty/models" - dutyTypes "github.com/flanksource/duty/types" "github.com/flanksource/kommons" - "github.com/google/go-cmp/cmp" "github.com/robfig/cron/v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -273,7 +271,7 @@ func ScanCanaryConfigs() { } for _, canary := range configs { - _, _, _, err := db.PersistCanary(canary, path.Base(configfile)) + _, err := db.PersistCanary(canary, path.Base(configfile)) if err != nil { logger.Errorf("could not persist %s: %v", canary.Name, err) } else { @@ -384,66 +382,6 @@ func DeleteCanaryJob(id string) { CanaryScheduler.Remove(entry.ID) } -func ReconcileCanaryChecks() { - logger.Infof("Reconciling Canary Checks") - - jobHistory := models.NewJobHistory("ReconcileCanaryChecks", "", "").Start() - logIfError(db.PersistJobHistory(jobHistory), "failed to persist job history [ReconcileCanaryChecks start]") - defer func() { - logIfError(db.PersistJobHistory(jobHistory.End()), "failed to persist job history [ReconcileCanaryChecks end]") - }() - - canaries, err := db.GetAllCanariesForSync() - if err != nil { - logger.Errorf("Error fetching canaries: %v", err) - jobHistory.AddError(err.Error()) - return - } - canaryCheckMapping := make(map[string]dutyTypes.JSONStringMap) - for _, c := range canaries { - canaryCheckMapping[c.ID.String()] = c.Checks - } - - var rows []struct { - CanaryID string - Checks dutyTypes.JSONStringMap - } - db.Gorm.Raw(` - SELECT json_object_agg(checks.name, checks.id) as checks, canary_id - FROM checks - WHERE - deleted_at IS NULL AND - agent_id = '00000000-0000-0000-0000-000000000000' - GROUP BY canary_id - `).Scan(&rows) - - var idsToPersist []string - for _, r := range rows { - if checks, exists := canaryCheckMapping[r.CanaryID]; exists { - if !cmp.Equal(r.Checks, checks) { - idsToPersist = append(idsToPersist, r.CanaryID) - } - } else { - // If the canaryID is not found in map, that means - // check is not deleted but the canary is - logger.Errorf("Canary[%s] is marked deleted but has active checks") - } - } - - var canariesToPersist []pkg.Canary - if err := db.Gorm.Table("canaries").Where("id IN ?", idsToPersist).Find(&canariesToPersist).Error; err != nil { - logger.Errorf("Error fetching canaries: %v", err) - return - } - - for _, c := range canaries { - if _, _, _, err := db.PersistCanaryModel(c); err != nil { - logger.Errorf("Error persisting canary: %v", err) - jobHistory.AddError(err.Error()) - } - } -} - func ScheduleFunc(schedule string, fn func()) (interface{}, error) { return FuncScheduler.AddFunc(schedule, fn) } diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 03dac4a01..e87817b40 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -109,11 +109,7 @@ func Start() { if _, err := ScheduleFunc(ReconcileDeletedTopologyComponentsSchedule, systemJobs.ReconcileDeletedTopologyComponents); err != nil { logger.Errorf("Failed to schedule ReconcileDeletedTopologyComponents: %v", err) } - if _, err := ScheduleFunc("@every 5m", canaryJobs.ReconcileCanaryChecks); err != nil { - logger.Errorf("Failed to schedule ReconcileCanaryChecks: %v", err) - } - canaryJobs.ReconcileCanaryChecks() canaryJobs.CleanupMetricsGauges() canaryJobs.SyncCanaryJobs() systemJobs.SyncTopologyJobs() diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 8fd9b53f6..0bae0e70f 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -22,7 +22,7 @@ func SyncCanary(dataFile string, configFiles ...string) error { } for _, canary := range configs { - _, _, _, err := db.PersistCanary(canary, path.Base(configfile)) + _, err := db.PersistCanary(canary, path.Base(configfile)) if err != nil { return err }