Skip to content

Commit

Permalink
add workflow activation and scheduling lock TTL (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
garden-of-delete committed Mar 8, 2024
1 parent 88ec319 commit 7afd9f1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
1 change: 1 addition & 0 deletions .sprinkler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ control:

scheduler:
interval: "1s"
lockTimeout: "1h"
orchard:
address: "http://ws:8082"
# apiKeyName: "x-api-key"
Expand Down
13 changes: 12 additions & 1 deletion cmd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
)

type SchedulerCmdOpt struct {
Interval time.Duration
Interval time.Duration
LockTimeout time.Duration

OrchardAddress string
OrchardAPIKeyName string
OrchardAPIKey string
Expand All @@ -26,6 +28,7 @@ func getSchedulerCmdOpt() SchedulerCmdOpt {
OrchardAddress: viper.GetString("scheduler.orchard.address"),
OrchardAPIKeyName: viper.GetString("scheduler.orchard.apiKeyName"),
OrchardAPIKey: viper.GetString("scheduler.orchard.apiKey"),
LockTimeout: viper.GetDuration("scheduler.lockTimeout"),
}
}

Expand All @@ -47,6 +50,7 @@ to quickly create a Cobra application.`,
OrchardHost: schedulerCmdOpt.OrchardAddress,
OrchardAPIKeyName: schedulerCmdOpt.OrchardAPIKeyName,
OrchardAPIKey: schedulerCmdOpt.OrchardAPIKey,
LockTimeout: schedulerCmdOpt.LockTimeout,
}
scheduler.Start()
},
Expand Down Expand Up @@ -82,4 +86,11 @@ func init() {
"api key to orchard service",
)
viper.BindPFlag("scheduler.orchard.apiKey", schedulerCmd.Flags().Lookup("orchard"))

schedulerCmd.Flags().Duration(
"lockTimeout",
time.Hour,
"Workflow schedule and activation lock TTL",
)
viper.BindPFlag("scheduler.lockTimeout", schedulerCmd.Flags().Lookup("lockTimeout"))
}
30 changes: 22 additions & 8 deletions service/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (s ScheduleStatus) ToString() string {

type Scheduler struct {
Interval time.Duration
LockTimeout time.Duration
MaxSize uint
OrchardHost string
OrchardAPIKeyName string
Expand All @@ -66,11 +67,24 @@ func (s *Scheduler) Start() {
tick := time.Tick(s.Interval)
for range tick {
fmt.Println("tick")
s.deleteExpiredLocks(database.GetInstance())
s.scheduleWorkflows(database.GetInstance())
s.activateWorkflows(database.GetInstance())
}
}

func (s *Scheduler) deleteExpiredLocks(db *gorm.DB) {
expiryTime := time.Now().Add(-s.LockTimeout)

db.Model(&table.WorkflowActivatorLock{}).
Where("lock_time < ?", expiryTime).
Delete(&table.WorkflowActivatorLock{})

db.Model(&table.WorkflowSchedulerLock{}).
Where("lock_time < ?", expiryTime).
Delete(&table.WorkflowSchedulerLock{})
}

func (s *Scheduler) scheduleWorkflows(db *gorm.DB) {
var workflows []table.Workflow

Expand Down Expand Up @@ -187,6 +201,10 @@ func (s *Scheduler) lockAndCreate(db *gorm.DB, wf table.Workflow) {
return
}

// release the lock
defer db.Where("workflow_id = ? and token = ?", wf.ID, token).
Delete(&table.WorkflowSchedulerLock{})

fmt.Println("creating workflow", wf.Name, token)
client := &orchard.OrchardRestClient{
Host: s.OrchardHost,
Expand Down Expand Up @@ -221,10 +239,6 @@ func (s *Scheduler) lockAndCreate(db *gorm.DB, wf table.Workflow) {
}
return nil
})

// release the lock
db.Where("workflow_id = ? and token = ?", wf.ID, token).
Delete(&table.WorkflowSchedulerLock{})
}

func (s *Scheduler) lockAndActivate(db *gorm.DB, swf table.ScheduledWorkflow) {
Expand All @@ -249,6 +263,10 @@ func (s *Scheduler) lockAndActivate(db *gorm.DB, swf table.ScheduledWorkflow) {
return
}

// release the lock
defer db.Where("scheduled_id = ? and token = ?", swf.ID, token).
Delete(&table.WorkflowActivatorLock{})

fmt.Println("activating workflow", swf.OrchardID, token)
client := &orchard.OrchardRestClient{
Host: s.OrchardHost,
Expand All @@ -268,10 +286,6 @@ func (s *Scheduler) lockAndActivate(db *gorm.DB, swf table.ScheduledWorkflow) {
}
return nil
})

// release the lock
db.Where("scheduled_id = ? and token = ?", swf.ID, token).
Delete(&table.WorkflowActivatorLock{})
}

func notifyOwner(wf table.Workflow, orchardErr error) {
Expand Down

0 comments on commit 7afd9f1

Please sign in to comment.