From be2187b36b6bdfb640b2486889ad8adcf7bce14e Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 25 Oct 2024 11:48:50 +0545 Subject: [PATCH] feat: wait for checks before shutdown [skip ci] --- canary-checker.properties | 1 + cmd/operator.go | 7 +++++++ cmd/serve.go | 7 +++++++ go.mod | 2 +- go.sum | 2 -- pkg/jobs/canary/sync.go | 26 ++++++++++++++++++++++++++ 6 files changed, 42 insertions(+), 3 deletions(-) diff --git a/canary-checker.properties b/canary-checker.properties index 7554ec754..e8295057a 100644 --- a/canary-checker.properties +++ b/canary-checker.properties @@ -6,5 +6,6 @@ # topology.runNow=true log.level.db=warn +# check.concurrency=100 # jobs.ComponentRelationshipSync.runNow=true diff --git a/cmd/operator.go b/cmd/operator.go index e9091f9a2..6e4a14c01 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -8,6 +8,7 @@ import ( apicontext "github.com/flanksource/canary-checker/api/context" "github.com/flanksource/canary-checker/pkg/cache" "github.com/flanksource/canary-checker/pkg/jobs" + "github.com/flanksource/canary-checker/pkg/jobs/canary" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" "github.com/flanksource/canary-checker/pkg/runner" "github.com/flanksource/canary-checker/pkg/utils" @@ -96,6 +97,12 @@ func run() error { // so we use a goroutine to unblock server start // to prevent health check from failing go jobs.Start() + + // TODO: stop the cron scheduler so that no more checks are scheduled + + shutdown.AddHookWithPriority("check jobs", shutdown.PriorityJobs, func() { + canary.AcquireAllCheckLocks(ctx) + }) } go serve() diff --git a/cmd/serve.go b/cmd/serve.go index e6d94194f..f8b4e2b8f 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -16,6 +16,7 @@ import ( "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/echo" "github.com/flanksource/canary-checker/pkg/jobs" + "github.com/flanksource/canary-checker/pkg/jobs/canary" canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary" echov4 "github.com/labstack/echo/v4" @@ -49,6 +50,12 @@ var Serve = &cobra.Command{ canaryJobs.StartScanCanaryConfigs(apicontext.DefaultContext, dataFile, configFiles) if executor { jobs.Start() + + // TODO: stop the cron scheduler so that no more checks are scheduled + + shutdown.AddHookWithPriority("check jobs", shutdown.PriorityJobs, func() { + canary.AcquireAllCheckLocks(apicontext.DefaultContext) + }) } serve() diff --git a/go.mod b/go.mod index 607c5fb65..3fa75d75d 100644 --- a/go.mod +++ b/go.mod @@ -327,7 +327,7 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) -// replace github.com/flanksource/duty => ../duty +replace github.com/flanksource/duty => ../duty // replace github.com/flanksource/artifacts => ../artifacts diff --git a/go.sum b/go.sum index f9478a082..151249778 100644 --- a/go.sum +++ b/go.sum @@ -861,8 +861,6 @@ github.com/flanksource/artifacts v1.0.15 h1:3ImJr2y0ZCXw/QrMhfJJktAT7pYD3sMZR5ix github.com/flanksource/artifacts v1.0.15/go.mod h1:qHVCnQu5k50aWNJ5UhpcAKEl7pAzqUrFFKGSm147G70= github.com/flanksource/commons v1.30.5 h1:p8PXGiNt7SurBBh9K3ea8/ZrDvacXSYHJSs/cqJLDK8= github.com/flanksource/commons v1.30.5/go.mod h1:26zdVkmMPsGpvfcsvst5WgsqcyRL8KqFNxkumagBN+A= -github.com/flanksource/duty v1.0.727 h1:5f7mntZjtg4bvVCzLdIe2cwT8DnFtKYToq1H74T7tfo= -github.com/flanksource/duty v1.0.727/go.mod h1:sZY2NytdenrkqXoMD6Gn2C8xH6dm5HsqOeE0p74Z2VE= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.24.39 h1:O763lnNIcTELSMYeIO0dNDfcb3LoZvzU1fr62I4Yxqg= github.com/flanksource/gomplate/v3 v3.24.39/go.mod h1:0wY/+UPvd7CxmiTBNmzZdWIEOUZAsRkpGY1j5R711O8= diff --git a/pkg/jobs/canary/sync.go b/pkg/jobs/canary/sync.go index 77f53ad6d..83497118a 100644 --- a/pkg/jobs/canary/sync.go +++ b/pkg/jobs/canary/sync.go @@ -19,8 +19,29 @@ import ( "github.com/flanksource/duty/job" "github.com/flanksource/duty/models" "github.com/robfig/cron/v3" + "golang.org/x/sync/semaphore" ) +const propertyCheckConcurrency = "check.concurrency" + +var ( + // The maximum number of checks that can run concurrently + defaultCheckConcurrency = 50 + + // Holds in the lock for every running check. + // Can be overwritten by 'check.concurrency' property. + globalCheckSemaphore *semaphore.Weighted +) + +// AcquireAllCheckLocks blocks until the global check sempahore is fully acquired. +// +// This helps to ensure that no checks are currently running. +func AcquireAllCheckLocks(ctx context.Context) { + ctx.Logger.V(6).Infof("acquiring all check locks") + globalCheckSemaphore.Acquire(ctx, int64(ctx.Properties().Int(propertyCheckConcurrency, defaultCheckConcurrency))) + ctx.Logger.V(6).Infof("acquired all check locks") +} + var canaryJobs sync.Map const DefaultCanarySchedule = "@every 5m" @@ -140,6 +161,7 @@ func newCanaryJob(c CanaryJob) { IgnoreSuccessHistory: true, Retention: job.RetentionBalanced, ResourceID: c.DBCanary.ID.String(), + Semaphores: []*semaphore.Weighted{globalCheckSemaphore}, ResourceType: "canary", ID: fmt.Sprintf("%s/%s", c.Canary.Namespace, c.Canary.Name), Fn: c.Run, @@ -159,6 +181,10 @@ var SyncCanaryJobs = &job.Job{ Schedule: "@every 5m", Retention: job.RetentionFew, Fn: func(ctx job.JobRuntime) error { + if globalCheckSemaphore == nil { + globalCheckSemaphore = semaphore.NewWeighted(int64(ctx.Properties().Int(propertyCheckConcurrency, defaultCheckConcurrency))) + } + canaries, err := db.GetAllCanariesForSync(ctx.Context, runner.WatchNamespace) if err != nil { return err