Skip to content

Commit

Permalink
Merge pull request #1174 from flanksource/persist-ui
Browse files Browse the repository at this point in the history
fix: sync checks for canaries added from ui
  • Loading branch information
moshloop authored Jul 29, 2023
2 parents a81c4e7 + 510e990 commit afcd531
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pkg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (

"github.com/flanksource/canary-checker/api/external"
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg/db/types"
"github.com/flanksource/canary-checker/pkg/labels"
"github.com/flanksource/canary-checker/pkg/utils"
"github.com/flanksource/commons/console"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/types"
"github.com/google/uuid"
"github.com/lib/pq"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c
}

if !canary.DeletionTimestamp.IsZero() {
if err := db.DeleteCanary(*canary); err != nil {
if err := db.DeleteCanary(canary.GetPersistedID(), canary.DeletionTimestamp.Time); err != nil {
logger.Error(err, "failed to delete canary")
}
canaryJobs.DeleteCanaryJob(canary.GetPersistedID())
controllerutil.RemoveFinalizer(canary, FinalizerName)
return ctrl.Result{}, r.Update(ctx, canary)
}

dbCanary, checks, changed, err := db.PersistCanary(*canary, "kubernetes/"+string(canary.ObjectMeta.UID))
dbCanary, checks, changed, err := db.PersistCanary(*canary, "kubernetes/"+canary.GetPersistedID())
if err != nil {
return ctrl.Result{Requeue: true}, err
}
Expand Down
87 changes: 49 additions & 38 deletions pkg/db/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty"
"github.com/flanksource/duty/models"
dutyTypes "github.com/flanksource/duty/types"
"github.com/google/uuid"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand Down Expand Up @@ -182,37 +183,38 @@ func RemoveTransformedChecks(ids []string) error {
Error
}

func DeleteCanary(canary v1.Canary) error {
logger.Infof("deleting canary %s/%s", canary.Namespace, canary.Name)
model, err := pkg.CanaryFromV1(canary)
if err != nil {
return err
}
deleteTime := time.Now()
persistedID := canary.GetPersistedID()
var checkIDs []string
for _, checkID := range canary.Status.Checks {
checkIDs = append(checkIDs, checkID)
}
metrics.UnregisterGauge(checkIDs)
if persistedID == "" {
logger.Errorf("Canary %s/%s has not been persisted", canary.Namespace, canary.Name)
return nil
}
if err := Gorm.Where("id = ?", persistedID).Find(&model).UpdateColumn("deleted_at", deleteTime).Error; err != nil {
func DeleteCanary(id string, deleteTime time.Time) error {
logger.Infof("Deleting canary[%s]", id)

if err := Gorm.Table("canaries").Where("id = ?", id).UpdateColumn("deleted_at", deleteTime).Error; err != nil {
return err
}
if err := DeleteChecksForCanary(persistedID, deleteTime); err != nil {
checkIDs, err := DeleteChecksForCanary(id, deleteTime)
if err != nil {
return err
}
if err := DeleteCheckComponentRelationshipsForCanary(persistedID, deleteTime); err != nil {
metrics.UnregisterGauge(checkIDs)

if err := DeleteCheckComponentRelationshipsForCanary(id, deleteTime); err != nil {
return err
}
return nil
}

func DeleteChecksForCanary(id string, deleteTime time.Time) error {
return Gorm.Table("checks").Where("canary_id = ? and deleted_at is null", id).UpdateColumn("deleted_at", deleteTime).Error
func DeleteChecksForCanary(id string, deleteTime time.Time) ([]string, error) {
var checkIDs []string
var checks []pkg.Check
err := Gorm.Model(&checks).
Table("checks").
Clauses(clause.Returning{Columns: []clause.Column{{Name: "id"}}}).
Where("canary_id = ? and deleted_at IS NULL", id).
UpdateColumn("deleted_at", deleteTime).
Error

for _, c := range checks {
checkIDs = append(checkIDs, c.ID.String())
}
return checkIDs, err
}

func DeleteCheckComponentRelationshipsForCanary(id string, deleteTime time.Time) error {
Expand Down Expand Up @@ -285,7 +287,7 @@ func FindDeletedChecksSince(ctx context.Context, since time.Time) ([]string, err
func CreateCanary(canary *pkg.Canary) error {
if canary.Spec == nil || len(canary.Spec) == 0 {
empty := []byte("{}")
canary.Spec = types.JSON(empty)
canary.Spec = dutyTypes.JSON(types.JSON(empty))
}

return Gorm.Create(canary).Error
Expand All @@ -295,16 +297,9 @@ func CreateCheck(canary pkg.Canary, check *pkg.Check) error {
return Gorm.Create(&check).Error
}

func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]string, bool, error) {
func PersistCanaryModel(model pkg.Canary) (*pkg.Canary, map[string]string, bool, error) {
var err error
changed := false
model, err := pkg.CanaryFromV1(canary)
if err != nil {
return nil, nil, changed, err
}
if canary.GetPersistedID() != "" {
model.ID, _ = uuid.Parse(canary.GetPersistedID())
}
model.Source = source
tx := Gorm.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "agent_id"}, {Name: "name"}, {Name: "namespace"}, {Name: "source"}},
UpdateAll: true,
Expand Down Expand Up @@ -333,15 +328,15 @@ func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]str
return nil, nil, changed, err
}

var spec v1.CanarySpec
if err = json.Unmarshal(model.Spec, &spec); err != nil {
return nil, nil, changed, err
}

var checks = make(map[string]string)
var newCheckIDs []string
for _, config := range canary.Spec.GetAllChecks() {
for _, config := range spec.GetAllChecks() {
check := pkg.FromExternalCheck(model, config)
// not creating the new check if already exists in the status
// status is not patched correctly with the status id
if checkID := canary.GetCheckID(check.Name); checkID != "" {
check.ID, _ = uuid.Parse(checkID)
}
check.Spec, _ = json.Marshal(config)
id, err := PersistCheck(check, model.ID)
if err != nil {
Expand All @@ -361,9 +356,25 @@ func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]str
}
metrics.UnregisterGauge(checkIDsToRemove)
}

model.Checks = checks
return &model, checks, changed, nil
}

func PersistCanary(canary v1.Canary, source string) (*pkg.Canary, map[string]string, bool, error) {
changed := false
model, err := pkg.CanaryFromV1(canary)
if err != nil {
return nil, nil, changed, err
}
if canary.GetPersistedID() != "" {
model.ID, _ = uuid.Parse(canary.GetPersistedID())
}
model.Source = source

return PersistCanaryModel(model)
}

func RefreshCheckStatusSummary() {
if err := duty.RefreshCheckStatusSummary(Pool); err != nil {
logger.Errorf("error refreshing check_status_summary materialized view: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func DeleteInlineCanariesForComponent(componentID string, deleteTime time.Time)
return err
}
for _, c := range canaries {
if err := DeleteChecksForCanary(c.ID.String(), deleteTime); err != nil {
if _, err := DeleteChecksForCanary(c.ID.String(), deleteTime); err != nil {
logger.Debugf("Error deleting checks for canary %v", c.ID)
continue
}
Expand Down
64 changes: 44 additions & 20 deletions pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/flanksource/canary-checker/pkg/utils"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/models"
dutyTypes "github.com/flanksource/duty/types"
"github.com/flanksource/kommons"
"github.com/google/go-cmp/cmp"
"github.com/robfig/cron/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -365,39 +367,61 @@ func DeleteCanaryJob(id string) {
CanaryScheduler.Remove(entry.ID)
}

func ReconcileDeletedCanaryChecks() {
jobHistory := models.NewJobHistory("ReconcileDeletedCanaryChecks", "", "").Start()
func ReconcileCanaryChecks() {
logger.Infof("Reconciling Canary Checks")
jobHistory := models.NewJobHistory("ReconcileCanaryChecks", "", "").Start()
_ = db.PersistJobHistory(jobHistory)
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

canaries, err := db.GetAllCanariesForSync()
if err != nil {
logger.Errorf("Error fetching canaries: %v", err)
jobHistory.AddError(err.Error())
return
}
canaryCheckMapping := make(map[string]dutyTypes.JSONStringMap)
for _, c := range canaries {
canaryCheckMapping[c.ID.String()] = c.Checks
}

var rows []struct {
ID string
DeletedAt time.Time
CanaryID string
Checks dutyTypes.JSONStringMap
}
// Select all components whose topology ID is deleted but their deleted at is not marked
err := db.Gorm.Raw(`
SELECT DISTINCT(canaries.id), canaries.deleted_at
FROM canaries
INNER JOIN checks ON canaries.id = checks.canary_id
db.Gorm.Raw(`
SELECT json_object_agg(checks.name, checks.id) as checks, canary_id
FROM checks
WHERE
checks.deleted_at IS NULL AND
canaries.deleted_at IS NOT NULL
`).Scan(&rows).Error
deleted_at IS NULL AND
agent_id = '00000000-0000-0000-0000-000000000000'
GROUP BY canary_id
`).Scan(&rows)

if err != nil {
logger.Errorf("Error fetching deleted canary checks: %v", err)
jobHistory.AddError(err.Error())
var idsToPersist []string
for _, r := range rows {
if checks, exists := canaryCheckMapping[r.CanaryID]; exists {
if !cmp.Equal(r.Checks, checks) {
idsToPersist = append(idsToPersist, r.CanaryID)
}
} else {
// If the canaryID is not found in map, that means
// check is not deleted but the canary is
logger.Errorf("Canary[%s] is marked deleted but has active checks")
}
}

var canariesToPersist []pkg.Canary
if err := db.Gorm.Table("canaries").Where("id IN ?", idsToPersist).Find(&canariesToPersist).Error; err != nil {
logger.Errorf("Error fetching canaries: %v", err)
return
}

for _, r := range rows {
if err := db.DeleteChecksForCanary(r.ID, r.DeletedAt); err != nil {
logger.Errorf("Error deleting checks for canary[%s]: %v", r.ID, err)
for _, c := range canaries {
if _, _, _, err := db.PersistCanaryModel(c); err != nil {
logger.Errorf("Error persisting canary: %v", err)
jobHistory.AddError(err.Error())
}
DeleteCanaryJob(r.ID)
}
jobHistory.IncrSuccess()
}

func ScheduleFunc(schedule string, fn func()) (interface{}, error) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const (
PrometheusGaugeCleanupSchedule = "@every 1h"

ReconcileDeletedTopologyComponentsSchedule = "@every 1h"
ReconcileDeletedCanaryChecksSchedule = "@every 1h"
)

func Start() {
Expand Down Expand Up @@ -85,10 +84,11 @@ func Start() {
if _, err := ScheduleFunc(ReconcileDeletedTopologyComponentsSchedule, systemJobs.ReconcileDeletedTopologyComponents); err != nil {
logger.Errorf("Failed to schedule ReconcileDeletedTopologyComponents: %v", err)
}
if _, err := ScheduleFunc(ReconcileDeletedCanaryChecksSchedule, canaryJobs.ReconcileDeletedCanaryChecks); err != nil {
logger.Errorf("Failed to schedule ReconcileDeletedCanaryChecks: %v", err)
if _, err := ScheduleFunc("@every 5m", canaryJobs.ReconcileCanaryChecks); err != nil {
logger.Errorf("Failed to schedule ReconcileCanaryChecks: %v", err)
}

canaryJobs.ReconcileCanaryChecks()
canaryJobs.CleanupMetricsGauges()
canaryJobs.SyncCanaryJobs()
systemJobs.SyncTopologyJobs()
Expand Down
12 changes: 10 additions & 2 deletions pkg/topology/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,21 @@ func lookupConfig(ctx *ComponentContext, property *v1.Property) (*pkg.Property,
}

templateEnv := map[string]any{
"config": _config.Spec.ToMapStringAny(),
"tags": _config.Tags.ToMapStringAny(),
"config": _config.Spec,
"tags": toMapStringAny(_config.Tags),
}
prop.Text, err = gomplate.RunTemplate(templateEnv, property.ConfigLookup.Display.Template.Gomplate())
return prop, err
}

func toMapStringAny(m map[string]string) map[string]any {
r := make(map[string]any)
for k, v := range m {
r[k] = v
}
return r
}

func lookupProperty(ctx *ComponentContext, property *v1.Property) (pkg.Properties, error) {
prop := pkg.NewProperty(*property)

Expand Down

0 comments on commit afcd531

Please sign in to comment.