From f342c06cf9752c709b2dd33f7ae1676267283a50 Mon Sep 17 00:00:00 2001 From: schowsf <94021776+schowsf@users.noreply.github.com> Date: Wed, 23 Aug 2023 11:01:43 -0700 Subject: [PATCH] support stagger start time when multiple workflows from orchard runner (#32) when orchard runner returns multiple workflows, schedule option to `StaggerStartMinutes` allow activating each orchard workflow with a gap of the specified minutes. in scheduler, decoupled the original create and activate from a single method to 2. create will add stagger start minutes to each of next workflow as the `start_time`. This data goes to the `scheduled_workflows` table. A new decoupled method activate workflow will follow the time tick, and activate upon the start_time. The schema is updated. And the ws payload is added with an optional field `StaggerStartMinutes`. --- database/schema.go | 1 + database/table/tables.go | 23 ++++++--- orchard/orchard_client.go | 6 +-- service/control.go | 36 +++++++------ service/scheduler.go | 106 ++++++++++++++++++++++++++++++++------ 5 files changed, 127 insertions(+), 45 deletions(-) diff --git a/database/schema.go b/database/schema.go index 5c0e62e..7e84c3e 100644 --- a/database/schema.go +++ b/database/schema.go @@ -17,6 +17,7 @@ var Tables = []interface{}{ &table.Workflow{}, &table.ScheduledWorkflow{}, &table.WorkflowSchedulerLock{}, + &table.WorkflowActivatorLock{}, } var owner = os.Getenv("OWNER_SNS") diff --git a/database/table/tables.go b/database/table/tables.go index f65c1e3..f366b1d 100644 --- a/database/table/tables.go +++ b/database/table/tables.go @@ -14,14 +14,15 @@ import ( type Workflow struct { gorm.Model - Name string `gorm:"type:varchar(256);not null;index:workflows_name,unique"` - Artifact string `gorm:"type:varchar(2048);not null"` - Command string `gorm:"type:text;not null"` - Every model.Every `gorm:"type:varchar(64);not null"` - NextRuntime time.Time `gorm:"not null"` - Backfill bool `gorm:"not null"` - Owner *string `gorm:"type:varchar(2048)"` - IsActive bool `gorm:"not null"` + Name string `gorm:"type:varchar(256);not null;index:workflows_name,unique"` + Artifact string `gorm:"type:varchar(2048);not null"` + Command string `gorm:"type:text;not null"` + Every model.Every `gorm:"type:varchar(64);not null"` + NextRuntime time.Time `gorm:"not null"` + Backfill bool `gorm:"not null"` + Owner *string `gorm:"type:varchar(2048)"` + IsActive bool `gorm:"not null"` + ScheduleDelayMinutes uint `gorm:"default:0"` ScheduledWorkflows []ScheduledWorkflow } @@ -40,3 +41,9 @@ type WorkflowSchedulerLock struct { Token string `gorm:"type:varchar(64);not null"` LockTime time.Time `gorm:"not null"` } + +type WorkflowActivatorLock struct { + ScheduledID uint `gorm:"primaryKey"` + Token string `gorm:"type:varchar(64);not null"` + LockTime time.Time `gorm:"not null"` +} diff --git a/orchard/orchard_client.go b/orchard/orchard_client.go index 7d9fb51..221111d 100644 --- a/orchard/orchard_client.go +++ b/orchard/orchard_client.go @@ -107,15 +107,15 @@ func (c OrchardRestClient) Delete(orchardID string) error { type FakeOrchardClient struct { } -func (c FakeOrchardClient) Create(wf table.Workflow) (string, error) { +func (c FakeOrchardClient) Create(wf table.Workflow) ([]string, error) { runner := OrchardStdoutRunner{} results, err := runner.Generate(wf.Artifact, wf.Command) if err != nil { - return "", err + return []string{""}, err } log.Println("generating workflow", results) time.Sleep(1 * time.Second) - return fmt.Sprintf("wf-%s", uuid.New().String()), nil + return []string{fmt.Sprintf("wf-%s", uuid.New().String())}, nil } func (c FakeOrchardClient) Activate(wfID string) error { diff --git a/service/control.go b/service/control.go index 0f33fee..b942b63 100644 --- a/service/control.go +++ b/service/control.go @@ -28,14 +28,15 @@ type Control struct { } type postWorkflowReq struct { - Name string `json:"name" binding:"required"` - Artifact string `json:"artifact" binding:"required"` - Command string `json:"command" binding:"required"` - Every string `json:"every" binding:"required"` - NextRuntime time.Time `json:"nextRuntime" binding:"required"` - Backfill bool `json:"backfill"` // default false if absent - Owner *string `json:"owner"` - IsActive bool `json:"isActive"` // default false if absent + Name string `json:"name" binding:"required"` + Artifact string `json:"artifact" binding:"required"` + Command string `json:"command" binding:"required"` + Every string `json:"every" binding:"required"` + NextRuntime time.Time `json:"nextRuntime" binding:"required"` + Backfill bool `json:"backfill"` // default false if absent + Owner *string `json:"owner"` + IsActive bool `json:"isActive"` // default false if absent + ScheduleDelayMinutes uint `json:"scheduleDelayMinutes"` } type deleteWorkflowReq struct { @@ -68,20 +69,21 @@ func (ctrl *Control) putWorkflow(c *gin.Context) { } wf := table.Workflow{ - Name: body.Name, - Artifact: body.Artifact, - Command: body.Command, - Every: every, - NextRuntime: body.NextRuntime, - Backfill: body.Backfill, - Owner: body.Owner, - IsActive: body.IsActive, + Name: body.Name, + Artifact: body.Artifact, + Command: body.Command, + Every: every, + NextRuntime: body.NextRuntime, + Backfill: body.Backfill, + Owner: body.Owner, + IsActive: body.IsActive, + ScheduleDelayMinutes: body.ScheduleDelayMinutes, } // upsert workflow ctrl.db.Clauses( clause.OnConflict{ Columns: []clause.Column{{Name: "name"}}, - DoUpdates: clause.AssignmentColumns([]string{"updated_at", "artifact", "command", "every", "next_runtime", "backfill", "owner", "is_active"}), + DoUpdates: clause.AssignmentColumns([]string{"updated_at", "artifact", "command", "every", "next_runtime", "backfill", "owner", "is_active", "schedule_delay_minutes"}), }).Create(&wf) ctrl.db.Unscoped().Model(&wf).Update("deleted_at", nil) c.JSON(http.StatusOK, "OK") diff --git a/service/scheduler.go b/service/scheduler.go index 989ef16..a4ff539 100644 --- a/service/scheduler.go +++ b/service/scheduler.go @@ -32,6 +32,7 @@ const ( Deleted DeleteFailed Activated + Created ) func (s ScheduleStatus) ToString() string { @@ -46,7 +47,8 @@ func (s ScheduleStatus) ToString() string { return "delete_failed" case Activated: return "activated" - + case Created: + return "created" } panic("unknown ScheduleStatus") } @@ -65,6 +67,7 @@ func (s *Scheduler) Start() { for range tick { fmt.Println("tick") s.scheduleWorkflows(database.GetInstance()) + s.activateWorkflows(database.GetInstance()) } } @@ -77,7 +80,21 @@ func (s *Scheduler) scheduleWorkflows(db *gorm.DB) { Find(&workflows) for _, wf := range workflows { - go s.lockAndRun(db, wf) + go s.lockAndCreate(db, wf) + } +} + +func (s *Scheduler) activateWorkflows(db *gorm.DB) { + var scheduledWorkflows []table.ScheduledWorkflow + + db.Model(&table.ScheduledWorkflow{}). + Joins("left join workflow_activator_locks l on scheduled_workflows.id = l.scheduled_id"). + Where("start_time <= ? and status = 'created' and l.token is null", time.Now()). + Order("start_time"). + Find(&scheduledWorkflows) + + for _, swf := range scheduledWorkflows { + go s.lockAndActivate(db, swf) } } @@ -118,7 +135,7 @@ func (s *Scheduler) cancelWorkflows( return updatedStatuses } -func (s *Scheduler) createActivateWorkflow( +func (s *Scheduler) createWorkflow( client *orchard.OrchardRestClient, wf table.Workflow, ) map[string]string { @@ -130,18 +147,25 @@ func (s *Scheduler) createActivateWorkflow( return s.deleteWorkflows(client, createdIDs, statuses) } for _, createdID := range createdIDs { - err = client.Activate(createdID) - if err != nil { - fmt.Printf("[error] error activating workflow: %s\n", err) - notifyOwner(wf, err) - return s.cancelWorkflows(client, statuses) - } - statuses[createdID] = Activated.ToString() + statuses[createdID] = Created.ToString() } return statuses } -func (s *Scheduler) lockAndRun(db *gorm.DB, wf table.Workflow) { +func (s *Scheduler) activateWorkflow( + client *orchard.OrchardRestClient, + swf table.ScheduledWorkflow, + wf table.Workflow, +) string { + if err := client.Activate(swf.OrchardID); err != nil { + fmt.Printf("[error] error activating workflow: %s\n", err) + notifyOwner(wf, err) + return swf.Status + } + return Activated.ToString() +} + +func (s *Scheduler) lockAndCreate(db *gorm.DB, wf table.Workflow) { token := uuid.New().String() lock := table.WorkflowSchedulerLock{ @@ -151,7 +175,7 @@ func (s *Scheduler) lockAndRun(db *gorm.DB, wf table.Workflow) { } result := db.Create(&lock) if result.Error != nil { - fmt.Printf("something else is running this workflow %v! skip...\n", wf.ID) + fmt.Printf("something else is creating this workflow %v! skip...\n", wf.ID) return } @@ -159,32 +183,33 @@ func (s *Scheduler) lockAndRun(db *gorm.DB, wf table.Workflow) { db.First(&existingLock, wf.ID) if existingLock.Token != token { - fmt.Printf("something else is running this workflow %v! skip...\n", wf.ID) + fmt.Printf("something else is creating this workflow %v! skip...\n", wf.ID) return } - fmt.Println("running workflow", wf.Name, token) + fmt.Println("creating workflow", wf.Name, token) client := &orchard.OrchardRestClient{ Host: s.OrchardHost, APIKeyName: s.OrchardAPIKeyName, APIKey: s.OrchardAPIKey, } - scheduleStatus := s.createActivateWorkflow(client, wf) + scheduleStatus := s.createWorkflow(client, wf) // add to scheduled and update the next run time db.Transaction(func(tx *gorm.DB) error { - now := time.Now() + startTime := time.Now() for orchardID, status := range scheduleStatus { if err := tx.Create(&table.ScheduledWorkflow{ WorkflowID: wf.ID, OrchardID: orchardID, - StartTime: now, + StartTime: startTime, ScheduledStartTime: wf.NextRuntime, Status: status, }).Error; err != nil { return err } + startTime = startTime.Add(time.Duration(wf.ScheduleDelayMinutes) * time.Minute) } fmt.Println(wf.Every) @@ -202,6 +227,53 @@ func (s *Scheduler) lockAndRun(db *gorm.DB, wf table.Workflow) { Delete(&table.WorkflowSchedulerLock{}) } +func (s *Scheduler) lockAndActivate(db *gorm.DB, swf table.ScheduledWorkflow) { + token := uuid.New().String() + + lock := table.WorkflowActivatorLock{ + swf.ID, + token, + time.Now(), + } + result := db.Create(&lock) + if result.Error != nil { + fmt.Printf("something else is activating this scheduled workflow %v! skip...\n", swf.ID) + return + } + + existingLock := table.WorkflowActivatorLock{} + db.First(&existingLock, swf.ID) + + if existingLock.Token != token { + fmt.Printf("something else is activating this scheduled workflow %v! skip...\n", swf.ID) + return + } + + fmt.Println("activating workflow", swf.OrchardID, token) + client := &orchard.OrchardRestClient{ + Host: s.OrchardHost, + APIKeyName: s.OrchardAPIKeyName, + APIKey: s.OrchardAPIKey, + } + + wf := table.Workflow{} + db.First(&wf, swf.WorkflowID) + + status := s.activateWorkflow(client, swf, wf) + + // update status in scheduled_workflows table + db.Transaction(func(tx *gorm.DB) error { + if err := tx.Model(&swf).Update("status", status).Error; err != nil { + return err + } + return nil + }) + + // release the lock + db.Where("scheduled_id = ? and token = ?", swf.ID, token). + Delete(&table.WorkflowActivatorLock{}) +} + func notifyOwner(wf table.Workflow, orchardErr error) { errMsg := fmt.Sprintf( "[error] Failed to schedule workflow %v with error %q\n",