diff --git a/.sprinkler.yaml b/.sprinkler.yaml index 9a8994a..831ec62 100644 --- a/.sprinkler.yaml +++ b/.sprinkler.yaml @@ -18,6 +18,7 @@ control: scheduler: interval: "1s" + lockTimeout: "1h" orchard: address: "http://ws:8082" # apiKeyName: "x-api-key" diff --git a/cmd/scheduler.go b/cmd/scheduler.go index a1a03b7..dfdd8f7 100644 --- a/cmd/scheduler.go +++ b/cmd/scheduler.go @@ -14,7 +14,9 @@ import ( ) type SchedulerCmdOpt struct { - Interval time.Duration + Interval time.Duration + LockTimeout time.Duration + OrchardAddress string OrchardAPIKeyName string OrchardAPIKey string @@ -26,6 +28,7 @@ func getSchedulerCmdOpt() SchedulerCmdOpt { OrchardAddress: viper.GetString("scheduler.orchard.address"), OrchardAPIKeyName: viper.GetString("scheduler.orchard.apiKeyName"), OrchardAPIKey: viper.GetString("scheduler.orchard.apiKey"), + LockTimeout: viper.GetDuration("scheduler.lockTimeout"), } } @@ -47,6 +50,7 @@ to quickly create a Cobra application.`, OrchardHost: schedulerCmdOpt.OrchardAddress, OrchardAPIKeyName: schedulerCmdOpt.OrchardAPIKeyName, OrchardAPIKey: schedulerCmdOpt.OrchardAPIKey, + LockTimeout: schedulerCmdOpt.LockTimeout, } scheduler.Start() }, @@ -82,4 +86,11 @@ func init() { "api key to orchard service", ) viper.BindPFlag("scheduler.orchard.apiKey", schedulerCmd.Flags().Lookup("orchard")) + + schedulerCmd.Flags().Duration( + "lockTimeout", + time.Hour, + "Workflow schedule and activation lock TTL", + ) + viper.BindPFlag("scheduler.lockTimeout", schedulerCmd.Flags().Lookup("lockTimeout")) } diff --git a/service/scheduler.go b/service/scheduler.go index a4ff539..8852899 100644 --- a/service/scheduler.go +++ b/service/scheduler.go @@ -55,6 +55,7 @@ func (s ScheduleStatus) ToString() string { type Scheduler struct { Interval time.Duration + LockTimeout time.Duration MaxSize uint OrchardHost string OrchardAPIKeyName string @@ -66,11 +67,24 @@ func (s *Scheduler) Start() { tick := time.Tick(s.Interval) for range tick { fmt.Println("tick") + s.deleteExpiredLocks(database.GetInstance()) s.scheduleWorkflows(database.GetInstance()) s.activateWorkflows(database.GetInstance()) } } +func (s *Scheduler) deleteExpiredLocks(db *gorm.DB) { + expiryTime := time.Now().Add(-s.LockTimeout) + + db.Model(&table.WorkflowActivatorLock{}). + Where("lock_time < ?", expiryTime). + Delete(&table.WorkflowActivatorLock{}) + + db.Model(&table.WorkflowSchedulerLock{}). + Where("lock_time < ?", expiryTime). + Delete(&table.WorkflowSchedulerLock{}) +} + func (s *Scheduler) scheduleWorkflows(db *gorm.DB) { var workflows []table.Workflow @@ -187,6 +201,10 @@ func (s *Scheduler) lockAndCreate(db *gorm.DB, wf table.Workflow) { return } + // release the lock + defer db.Where("workflow_id = ? and token = ?", wf.ID, token). + Delete(&table.WorkflowSchedulerLock{}) + fmt.Println("creating workflow", wf.Name, token) client := &orchard.OrchardRestClient{ Host: s.OrchardHost, @@ -221,10 +239,6 @@ func (s *Scheduler) lockAndCreate(db *gorm.DB, wf table.Workflow) { } return nil }) - - // release the lock - db.Where("workflow_id = ? and token = ?", wf.ID, token). - Delete(&table.WorkflowSchedulerLock{}) } func (s *Scheduler) lockAndActivate(db *gorm.DB, swf table.ScheduledWorkflow) { @@ -249,6 +263,10 @@ func (s *Scheduler) lockAndActivate(db *gorm.DB, swf table.ScheduledWorkflow) { return } + // release the lock + defer db.Where("scheduled_id = ? and token = ?", swf.ID, token). + Delete(&table.WorkflowActivatorLock{}) + fmt.Println("activating workflow", swf.OrchardID, token) client := &orchard.OrchardRestClient{ Host: s.OrchardHost, @@ -268,10 +286,6 @@ func (s *Scheduler) lockAndActivate(db *gorm.DB, swf table.ScheduledWorkflow) { } return nil }) - - // release the lock - db.Where("scheduled_id = ? and token = ?", swf.ID, token). - Delete(&table.WorkflowActivatorLock{}) } func notifyOwner(wf table.Workflow, orchardErr error) {