Skip to content

Commit

Permalink
fix: update on conflict clause for canaries (#1246)
Browse files Browse the repository at this point in the history
* fix: update on conflict clause for canaries

* refactor: use cache for canary crds

* chore: change canary crd reconcile time to 1h
  • Loading branch information
yashmehrotra authored Sep 4, 2023
1 parent 03ee52d commit 73fe256
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 116 deletions.
8 changes: 8 additions & 0 deletions cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/flanksource/canary-checker/pkg/cache"
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/canary-checker/pkg/jobs"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
"github.com/flanksource/canary-checker/pkg/runner"
"github.com/flanksource/canary-checker/pkg/utils"
gocache "github.com/patrickmn/go-cache"

canaryv1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg"
Expand All @@ -21,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
ctrlCache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrlzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
Expand Down Expand Up @@ -89,6 +93,9 @@ func run(cmd *cobra.Command, args []string) {
LeaderElection: enableLeaderElection,
LeaderElectionNamespace: operatorNamespace,
LeaderElectionID: "bc88107d.flanksource.com",
Cache: ctrlCache.Options{
SyncPeriod: utils.Ptr(1 * time.Hour),
},
})
if err != nil {
setupLog.Error(err, "unable to start manager")
Expand All @@ -115,6 +122,7 @@ func run(cmd *cobra.Command, args []string) {
Log: ctrl.Log.WithName("controllers").WithName("canary"),
Scheme: mgr.GetScheme(),
RunnerName: runner.RunnerName,
CanaryCache: gocache.New(7*24*time.Hour, 1*time.Hour),
}

systemReconciler := &controllers.TopologyReconciler{
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ require (
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1 h1:dOYG7LS/WK00RWZc8XGgcUTlTxpp3mKhdR2Q9z9HbXM=
github.com/nsf/jsondiff v0.0.0-20230430225905-43f6cf3098c1/go.mod h1:mpRZBD8SJ55OIICQ3iWH0Yz3cjzA61JdqMLoWXeB2+8=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
6 changes: 3 additions & 3 deletions pkg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ type Timeseries struct {
type Canary struct {
ID uuid.UUID `gorm:"default:generate_ulid()"`
AgentID uuid.UUID
Spec types.JSON
Labels types.JSONStringMap
Spec types.JSON `json:"spec"`
Labels types.JSONStringMap `json:"labels"`
Source string
Name string
Namespace string
Checks types.JSONStringMap `gorm:"-"`
CreatedAt time.Time
UpdatedAt time.Time
UpdatedAt time.Time `json:"updated_at"`
DeletedAt *time.Time `json:"deleted_at,omitempty" time_format:"postgres_timestamp"`
}

Expand Down
72 changes: 61 additions & 11 deletions pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
"github.com/flanksource/kommons"
"github.com/go-logr/logr"
jsontime "github.com/liamylian/jsontime/v2/v2"
"github.com/nsf/jsondiff"
"github.com/patrickmn/go-cache"
"github.com/robfig/cron/v3"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -39,20 +42,23 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

var json = jsontime.ConfigWithCustomTimeFormat

// CanaryReconciler reconciles a Canary object
type CanaryReconciler struct {
IncludeCheck string
IncludeNamespaces []string
LogPass, LogFail bool
client.Client
Kubernetes kubernetes.Interface
Kommons *kommons.Client
Log logr.Logger
Scheme *runtime.Scheme
Events record.EventRecorder
Cron *cron.Cron
RunnerName string
Done chan *pkg.CheckResult
Kubernetes kubernetes.Interface
Kommons *kommons.Client
Log logr.Logger
Scheme *runtime.Scheme
Events record.EventRecorder
Cron *cron.Cron
RunnerName string
Done chan *pkg.CheckResult
CanaryCache *cache.Cache
}

const FinalizerName = "canary.canaries.flanksource.com"
Expand Down Expand Up @@ -95,13 +101,13 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c
return ctrl.Result{}, r.Update(ctx, canary)
}

dbCanary, checks, changed, err := db.PersistCanary(*canary, "kubernetes/"+canary.GetPersistedID())
dbCanary, err := r.updateCanaryInDB(canary)
if err != nil {
return ctrl.Result{Requeue: true}, err
}

// Sync jobs if canary is created or updated
if canary.Generation == 1 || changed {
if canary.Generation == 1 {
if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil {
logger.Error(err, "failed to sync canary job")
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
Expand All @@ -117,7 +123,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c
}
patch := client.MergeFrom(canaryForStatus.DeepCopy())

canaryForStatus.Status.Checks = checks
canaryForStatus.Status.Checks = dbCanary.Checks
canaryForStatus.Status.ObservedGeneration = canary.Generation
if err = r.Status().Patch(ctx, &canaryForStatus, patch); err != nil {
logger.Error(err, "failed to update status for canary")
Expand All @@ -133,6 +139,50 @@ func (r *CanaryReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *CanaryReconciler) persistAndCacheCanary(canary *v1.Canary) (*pkg.Canary, error) {
dbCanary, err := db.PersistCanary(*canary, "kubernetes/"+canary.GetPersistedID())
if err != nil {
return nil, err
}
r.CanaryCache.Set(dbCanary.ID.String(), dbCanary, cache.DefaultExpiration)

if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil {
return nil, err
}
return dbCanary, nil
}

func (r *CanaryReconciler) updateCanaryInDB(canary *v1.Canary) (*pkg.Canary, error) {
var dbCanary *pkg.Canary
var err error

// Get DBCanary from cache if exists else persist in database and update cache
if cacheObj, exists := r.CanaryCache.Get(canary.GetPersistedID()); !exists {
dbCanary, err = r.persistAndCacheCanary(canary)
if err != nil {
return nil, err
}
} else {
dbCanary = cacheObj.(*pkg.Canary)
}

// Compare canary spec and spec in database
// If they do not match, persist the canary in database
canarySpecJSON, err := json.Marshal(canary.Spec)
if err != nil {
return nil, err
}
opts := jsondiff.DefaultJSONOptions()
if diff, _ := jsondiff.Compare(canarySpecJSON, dbCanary.Spec, &opts); diff != jsondiff.FullMatch {
dbCanary, err = r.persistAndCacheCanary(canary)
if err != nil {
return nil, err
}
}

return dbCanary, nil
}

func (r *CanaryReconciler) Report() {
for payload := range canaryJobs.CanaryStatusChannel {
var canary v1.Canary
Expand Down
57 changes: 24 additions & 33 deletions pkg/db/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
)

func GetAllCanariesForSync() ([]pkg.Canary, error) {
var _canaries []pkg.Canary
var rawCanaries interface{}
query := `
SELECT json_agg(
jsonb_set_lax(to_jsonb(canaries),'{checks}', (
Expand All @@ -43,24 +41,20 @@ func GetAllCanariesForSync() ([]pkg.Canary, error) {
agent_id = '00000000-0000-0000-0000-000000000000'
`

rows, err := Gorm.Raw(query).Rows()
rows, err := Pool.Query(context.Background(), query)
if err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
defer rows.Close()

var _canaries []pkg.Canary
for rows.Next() {
if err := rows.Scan(&rawCanaries); err != nil {
return nil, err
if rows.RawValues()[0] == nil {
continue
}

if err := json.Unmarshal(rows.RawValues()[0], &_canaries); err != nil {
return nil, fmt.Errorf("failed to unmarshal canaries:%w for %s", err, rows.RawValues()[0])
}
}
if rawCanaries == nil {
return nil, nil
}
if err := json.Unmarshal(rawCanaries.([]byte), &_canaries); err != nil {
return nil, err
}
return _canaries, nil
}
Expand Down Expand Up @@ -294,22 +288,20 @@ func CreateCheck(canary pkg.Canary, check *pkg.Check) error {
return Gorm.Create(&check).Error
}

func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, map[string]string, bool, error) {
var err error
changed := false
tx := Gorm.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "agent_id"}, {Name: "name"}, {Name: "namespace"}, {Name: "source"}},
DoUpdates: clause.AssignmentColumns([]string{"labels", "spec"}),
}).Create(&model)
if tx.RowsAffected > 0 {
changed = true
}
func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, error) {
err := Gorm.Clauses(
clause.OnConflict{
Columns: []clause.Column{{Name: "agent_id"}, {Name: "name"}, {Name: "namespace"}, {Name: "source"}},
DoUpdates: clause.AssignmentColumns([]string{"labels", "spec"}),
},
clause.Returning{},
).Create(&model).Error

// Duplicate key happens when an already created canary is persisted
// We will ignore this error but act on other errors
if err != nil {
if !errors.Is(tx.Error, gorm.ErrDuplicatedKey) {
return nil, map[string]string{}, changed, tx.Error
if !errors.Is(err, gorm.ErrDuplicatedKey) {
return nil, err
}
}

Expand All @@ -322,12 +314,12 @@ func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, map[string]string, bool,
Error
if err != nil {
logger.Errorf("Error fetching existing checks for canary:%s", model.ID)
return nil, nil, changed, err
return nil, err
}

var spec v1.CanarySpec
if err = json.Unmarshal(model.Spec, &spec); err != nil {
return nil, nil, changed, err
return nil, err
}

var checks = make(map[string]string)
Expand Down Expand Up @@ -355,14 +347,13 @@ func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, map[string]string, bool,
}

model.Checks = checks
return &model, checks, changed, nil
return &model, nil
}

func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]string, bool, error) {
changed := false
func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, error) {
model, err := pkg.CanaryFromV1(canary)
if err != nil {
return nil, nil, changed, err
return nil, err
}
if canary.GetPersistedID() != "" {
model.ID, _ = uuid.Parse(canary.GetPersistedID())
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/canary_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func CreateComponentCanaryFromInline(id, name, namespace, schedule, owner string
},
Spec: *spec,
}
canary, _, _, err := PersistCanary(obj, fmt.Sprintf("component/%s", id))
canary, err := PersistCanary(obj, fmt.Sprintf("component/%s", id))
if err != nil {
logger.Debugf("error persisting component inline canary: %v", err)
return nil, err
Expand Down
64 changes: 1 addition & 63 deletions pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"github.com/flanksource/canary-checker/pkg/utils"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/models"
dutyTypes "github.com/flanksource/duty/types"
"github.com/flanksource/kommons"
"github.com/google/go-cmp/cmp"
"github.com/robfig/cron/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -273,7 +271,7 @@ func ScanCanaryConfigs() {
}

for _, canary := range configs {
_, _, _, err := db.PersistCanary(canary, path.Base(configfile))
_, err := db.PersistCanary(canary, path.Base(configfile))
if err != nil {
logger.Errorf("could not persist %s: %v", canary.Name, err)
} else {
Expand Down Expand Up @@ -384,66 +382,6 @@ func DeleteCanaryJob(id string) {
CanaryScheduler.Remove(entry.ID)
}

func ReconcileCanaryChecks() {
logger.Infof("Reconciling Canary Checks")

jobHistory := models.NewJobHistory("ReconcileCanaryChecks", "", "").Start()
logIfError(db.PersistJobHistory(jobHistory), "failed to persist job history [ReconcileCanaryChecks start]")
defer func() {
logIfError(db.PersistJobHistory(jobHistory.End()), "failed to persist job history [ReconcileCanaryChecks end]")
}()

canaries, err := db.GetAllCanariesForSync()
if err != nil {
logger.Errorf("Error fetching canaries: %v", err)
jobHistory.AddError(err.Error())
return
}
canaryCheckMapping := make(map[string]dutyTypes.JSONStringMap)
for _, c := range canaries {
canaryCheckMapping[c.ID.String()] = c.Checks
}

var rows []struct {
CanaryID string
Checks dutyTypes.JSONStringMap
}
db.Gorm.Raw(`
SELECT json_object_agg(checks.name, checks.id) as checks, canary_id
FROM checks
WHERE
deleted_at IS NULL AND
agent_id = '00000000-0000-0000-0000-000000000000'
GROUP BY canary_id
`).Scan(&rows)

var idsToPersist []string
for _, r := range rows {
if checks, exists := canaryCheckMapping[r.CanaryID]; exists {
if !cmp.Equal(r.Checks, checks) {
idsToPersist = append(idsToPersist, r.CanaryID)
}
} else {
// If the canaryID is not found in map, that means
// check is not deleted but the canary is
logger.Errorf("Canary[%s] is marked deleted but has active checks")
}
}

var canariesToPersist []pkg.Canary
if err := db.Gorm.Table("canaries").Where("id IN ?", idsToPersist).Find(&canariesToPersist).Error; err != nil {
logger.Errorf("Error fetching canaries: %v", err)
return
}

for _, c := range canaries {
if _, _, _, err := db.PersistCanaryModel(c); err != nil {
logger.Errorf("Error persisting canary: %v", err)
jobHistory.AddError(err.Error())
}
}
}

func ScheduleFunc(schedule string, fn func()) (interface{}, error) {
return FuncScheduler.AddFunc(schedule, fn)
}
Expand Down
Loading

0 comments on commit 73fe256

Please sign in to comment.