Skip to content

Commit

Permalink
refactor: canary deletion handling
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmehrotra committed Jul 27, 2023
1 parent 8d36093 commit 93514ae
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pkg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (

"github.com/flanksource/canary-checker/api/external"
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg/db/types"
"github.com/flanksource/canary-checker/pkg/labels"
"github.com/flanksource/canary-checker/pkg/utils"
"github.com/flanksource/commons/console"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/types"
"github.com/google/uuid"
"github.com/lib/pq"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c
}

if !canary.DeletionTimestamp.IsZero() {
if err := db.DeleteCanary(*canary); err != nil {
if err := db.DeleteCanary(canary.GetPersistedID(), canary.DeletionTimestamp.Time); err != nil {
logger.Error(err, "failed to delete canary")
}
canaryJobs.DeleteCanaryJob(canary.GetPersistedID())
controllerutil.RemoveFinalizer(canary, FinalizerName)
return ctrl.Result{}, r.Update(ctx, canary)
}

dbCanary, checks, changed, err := db.PersistCanary(*canary, "kubernetes/"+string(canary.ObjectMeta.UID))
dbCanary, checks, changed, err := db.PersistCanary(*canary, "kubernetes/"+canary.GetPersistedID())
if err != nil {
return ctrl.Result{Requeue: true}, err
}
Expand Down
48 changes: 25 additions & 23 deletions pkg/db/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty"
"github.com/flanksource/duty/models"
dutyTypes "github.com/flanksource/duty/types"
"github.com/google/uuid"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand Down Expand Up @@ -182,37 +183,38 @@ func RemoveTransformedChecks(ids []string) error {
Error
}

func DeleteCanary(canary v1.Canary) error {
logger.Infof("deleting canary %s/%s", canary.Namespace, canary.Name)
model, err := pkg.CanaryFromV1(canary)
if err != nil {
return err
}
deleteTime := time.Now()
persistedID := canary.GetPersistedID()
var checkIDs []string
for _, checkID := range canary.Status.Checks {
checkIDs = append(checkIDs, checkID)
}
metrics.UnregisterGauge(checkIDs)
if persistedID == "" {
logger.Errorf("Canary %s/%s has not been persisted", canary.Namespace, canary.Name)
return nil
}
if err := Gorm.Where("id = ?", persistedID).Find(&model).UpdateColumn("deleted_at", deleteTime).Error; err != nil {
func DeleteCanary(id string, deleteTime time.Time) error {
logger.Infof("Deleting canary[%s]", id)

if err := Gorm.Table("canaries").Where("id = ?", id).UpdateColumn("deleted_at", deleteTime).Error; err != nil {
return err
}
if err := DeleteChecksForCanary(persistedID, deleteTime); err != nil {
checkIDs, err := DeleteChecksForCanary(id, deleteTime)
if err != nil {
return err
}
if err := DeleteCheckComponentRelationshipsForCanary(persistedID, deleteTime); err != nil {
metrics.UnregisterGauge(checkIDs)

if err := DeleteCheckComponentRelationshipsForCanary(id, deleteTime); err != nil {
return err
}
return nil
}

func DeleteChecksForCanary(id string, deleteTime time.Time) error {
return Gorm.Table("checks").Where("canary_id = ? and deleted_at is null", id).UpdateColumn("deleted_at", deleteTime).Error
func DeleteChecksForCanary(id string, deleteTime time.Time) ([]string, error) {
var checkIDs []string
var checks []pkg.Check
err := Gorm.Model(&checks).
Table("checks").
Clauses(clause.Returning{Columns: []clause.Column{{Name: "id"}}}).
Where("canary_id = ? and deleted_at IS NULL", id).
UpdateColumn("deleted_at", deleteTime).
Error

for _, c := range checks {
checkIDs = append(checkIDs, c.ID.String())
}
return checkIDs, err
}

func DeleteCheckComponentRelationshipsForCanary(id string, deleteTime time.Time) error {
Expand Down Expand Up @@ -285,7 +287,7 @@ func FindDeletedChecksSince(ctx context.Context, since time.Time) ([]string, err
func CreateCanary(canary *pkg.Canary) error {
if canary.Spec == nil || len(canary.Spec) == 0 {
empty := []byte("{}")
canary.Spec = types.JSON(empty)
canary.Spec = dutyTypes.JSON(types.JSON(empty))
}

return Gorm.Create(canary).Error
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func DeleteInlineCanariesForComponent(componentID string, deleteTime time.Time)
return err
}
for _, c := range canaries {
if err := DeleteChecksForCanary(c.ID.String(), deleteTime); err != nil {
if _, err := DeleteChecksForCanary(c.ID.String(), deleteTime); err != nil {
logger.Debugf("Error deleting checks for canary %v", c.ID)
continue
}
Expand Down
54 changes: 36 additions & 18 deletions pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/flanksource/canary-checker/pkg/utils"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/models"
dutyTypes "github.com/flanksource/duty/types"
"github.com/flanksource/kommons"
"github.com/google/go-cmp/cmp"
"github.com/robfig/cron/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -366,42 +368,58 @@ func DeleteCanaryJob(id string) {
}

func ReconcileCanaryChecks() {
logger.Infof("Reconciling Canary Checks")
jobHistory := models.NewJobHistory("ReconcileCanaryChecks", "", "").Start()
_ = db.PersistJobHistory(jobHistory)
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

// Get id, updateTime of all non deleted canaries
var rows []struct {
ID string
UpdatedAt time.Time
}
if err := db.Gorm.Raw(`SELECT id, updated_at FROM canaries WHERE deleted_at IS NULL`).Scan(&rows); err != nil {
canaries, err := db.GetAllCanariesForSync()
if err != nil {
logger.Errorf("Error fetching canaries: %v", err)
jobHistory.AddError(err.Error())
return
}
canaryCheckMapping := make(map[string]dutyTypes.JSONStringMap)
for _, c := range canaries {
canaryCheckMapping[c.ID.String()] = c.Checks
}

var rows []struct {
CanaryID string
Checks dutyTypes.JSONStringMap
}
db.Gorm.Raw(`
SELECT json_object_agg(checks.name, checks.id) as checks, canary_id
FROM checks
WHERE
deleted_at IS NULL AND
agent_id = '00000000-0000-0000-0000-000000000000'
GROUP BY canary_id
`).Scan(&rows)

var idsToPersist []string
for _, r := range rows {
updateTime, exists := canaryUpdateTimeCache.Load(r.ID)
if !exists {
idsToPersist = append(idsToPersist, r.ID)
canaryUpdateTimeCache.Store(r.ID, r.UpdatedAt)
continue
}
if r.UpdatedAt.After(updateTime.(time.Time)) {
idsToPersist = append(idsToPersist, r.ID)
if checks, exists := canaryCheckMapping[r.CanaryID]; exists {
if !cmp.Equal(r.Checks, checks) {
idsToPersist = append(idsToPersist, r.CanaryID)
}
} else {
// If the canaryID is not found in map, that means
// check is not deleted but the canary is
logger.Errorf("Canary[%s] is marked deleted but has active checks")
}
}

var canaries []pkg.Canary
if err := db.Gorm.Table("canaries").Where("id IN ?", idsToPersist).Find(&canaries).Error; err != nil {
var canariesToPersist []pkg.Canary
if err := db.Gorm.Table("canaries").Where("id IN ?", idsToPersist).Find(&canariesToPersist).Error; err != nil {
logger.Errorf("Error fetching canaries: %v", err)
return
}

for _, c := range canaries {
if _, _, _, err := db.PersistCanaryModel(c); err != nil {
logger.Errorf("Error persisting canary: %v", err)
jobHistory.AddError(err.Error())
}
}
}
Expand All @@ -415,7 +433,7 @@ func ReconcileDeletedCanaryChecks() {
ID string
DeletedAt time.Time
}
// Select all components whose topology ID is deleted but their deleted at is not marked
// Select all components whose canary ID is deleted but their deleted at is not marked
err := db.Gorm.Raw(`
SELECT DISTINCT(canaries.id), canaries.deleted_at
FROM canaries
Expand All @@ -432,7 +450,7 @@ func ReconcileDeletedCanaryChecks() {
}

for _, r := range rows {
if err := db.DeleteChecksForCanary(r.ID, r.DeletedAt); err != nil {
if err := db.DeleteCanary(r.ID, r.DeletedAt); err != nil {
logger.Errorf("Error deleting checks for canary[%s]: %v", r.ID, err)
jobHistory.AddError(err.Error())
}
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func Start() {
logger.Errorf("Failed to schedule ReconcileCanaryChecks: %v", err)
}

canaryJobs.ReconcileCanaryChecks()
canaryJobs.CleanupMetricsGauges()
canaryJobs.SyncCanaryJobs()
systemJobs.SyncTopologyJobs()
Expand Down
12 changes: 10 additions & 2 deletions pkg/topology/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,21 @@ func lookupConfig(ctx *ComponentContext, property *v1.Property) (*pkg.Property,
}

templateEnv := map[string]any{
"config": _config.Spec.ToMapStringAny(),
"tags": _config.Tags.ToMapStringAny(),
"config": _config.Spec,
"tags": toMapStringAny(_config.Tags),
}
prop.Text, err = templating.Template(templateEnv, property.ConfigLookup.Display.Template)
return prop, err
}

func toMapStringAny(m map[string]string) map[string]any {
r := make(map[string]any)
for k, v := range m {
r[k] = v
}
return r
}

func lookupProperty(ctx *ComponentContext, property *v1.Property) (pkg.Properties, error) {
prop := pkg.NewProperty(*property)

Expand Down

0 comments on commit 93514ae

Please sign in to comment.