Skip to content

Commit

Permalink
support stagger start time when multiple workflows from orchard runner (
Browse files Browse the repository at this point in the history
#32)

when orchard runner returns multiple workflows, schedule option to
`StaggerStartMinutes` allow activating each orchard workflow with a gap
of the specified minutes.

in scheduler, decoupled the original create and activate from a single
method to 2. create will add stagger start minutes to each of next
workflow as the `start_time`. This data goes to the
`scheduled_workflows` table. A new decoupled method activate workflow
will follow the time tick, and activate upon the start_time.

The schema is updated. And the ws payload is added with an optional
field `StaggerStartMinutes`.
  • Loading branch information
schowsf committed Aug 23, 2023
1 parent b344c48 commit f342c06
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 45 deletions.
1 change: 1 addition & 0 deletions database/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var Tables = []interface{}{
&table.Workflow{},
&table.ScheduledWorkflow{},
&table.WorkflowSchedulerLock{},
&table.WorkflowActivatorLock{},
}

var owner = os.Getenv("OWNER_SNS")
Expand Down
23 changes: 15 additions & 8 deletions database/table/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (

type Workflow struct {
gorm.Model
Name string `gorm:"type:varchar(256);not null;index:workflows_name,unique"`
Artifact string `gorm:"type:varchar(2048);not null"`
Command string `gorm:"type:text;not null"`
Every model.Every `gorm:"type:varchar(64);not null"`
NextRuntime time.Time `gorm:"not null"`
Backfill bool `gorm:"not null"`
Owner *string `gorm:"type:varchar(2048)"`
IsActive bool `gorm:"not null"`
Name string `gorm:"type:varchar(256);not null;index:workflows_name,unique"`
Artifact string `gorm:"type:varchar(2048);not null"`
Command string `gorm:"type:text;not null"`
Every model.Every `gorm:"type:varchar(64);not null"`
NextRuntime time.Time `gorm:"not null"`
Backfill bool `gorm:"not null"`
Owner *string `gorm:"type:varchar(2048)"`
IsActive bool `gorm:"not null"`
ScheduleDelayMinutes uint `gorm:"default:0"`

ScheduledWorkflows []ScheduledWorkflow
}
Expand All @@ -40,3 +41,9 @@ type WorkflowSchedulerLock struct {
Token string `gorm:"type:varchar(64);not null"`
LockTime time.Time `gorm:"not null"`
}

type WorkflowActivatorLock struct {
ScheduledID uint `gorm:"primaryKey"`
Token string `gorm:"type:varchar(64);not null"`
LockTime time.Time `gorm:"not null"`
}
6 changes: 3 additions & 3 deletions orchard/orchard_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ func (c OrchardRestClient) Delete(orchardID string) error {
type FakeOrchardClient struct {
}

func (c FakeOrchardClient) Create(wf table.Workflow) (string, error) {
func (c FakeOrchardClient) Create(wf table.Workflow) ([]string, error) {
runner := OrchardStdoutRunner{}
results, err := runner.Generate(wf.Artifact, wf.Command)
if err != nil {
return "", err
return []string{""}, err
}
log.Println("generating workflow", results)
time.Sleep(1 * time.Second)
return fmt.Sprintf("wf-%s", uuid.New().String()), nil
return []string{fmt.Sprintf("wf-%s", uuid.New().String())}, nil
}

func (c FakeOrchardClient) Activate(wfID string) error {
Expand Down
36 changes: 19 additions & 17 deletions service/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ type Control struct {
}

type postWorkflowReq struct {
Name string `json:"name" binding:"required"`
Artifact string `json:"artifact" binding:"required"`
Command string `json:"command" binding:"required"`
Every string `json:"every" binding:"required"`
NextRuntime time.Time `json:"nextRuntime" binding:"required"`
Backfill bool `json:"backfill"` // default false if absent
Owner *string `json:"owner"`
IsActive bool `json:"isActive"` // default false if absent
Name string `json:"name" binding:"required"`
Artifact string `json:"artifact" binding:"required"`
Command string `json:"command" binding:"required"`
Every string `json:"every" binding:"required"`
NextRuntime time.Time `json:"nextRuntime" binding:"required"`
Backfill bool `json:"backfill"` // default false if absent
Owner *string `json:"owner"`
IsActive bool `json:"isActive"` // default false if absent
ScheduleDelayMinutes uint `json:"scheduleDelayMinutes"`
}

type deleteWorkflowReq struct {
Expand Down Expand Up @@ -68,20 +69,21 @@ func (ctrl *Control) putWorkflow(c *gin.Context) {
}

wf := table.Workflow{
Name: body.Name,
Artifact: body.Artifact,
Command: body.Command,
Every: every,
NextRuntime: body.NextRuntime,
Backfill: body.Backfill,
Owner: body.Owner,
IsActive: body.IsActive,
Name: body.Name,
Artifact: body.Artifact,
Command: body.Command,
Every: every,
NextRuntime: body.NextRuntime,
Backfill: body.Backfill,
Owner: body.Owner,
IsActive: body.IsActive,
ScheduleDelayMinutes: body.ScheduleDelayMinutes,
}
// upsert workflow
ctrl.db.Clauses(
clause.OnConflict{
Columns: []clause.Column{{Name: "name"}},
DoUpdates: clause.AssignmentColumns([]string{"updated_at", "artifact", "command", "every", "next_runtime", "backfill", "owner", "is_active"}),
DoUpdates: clause.AssignmentColumns([]string{"updated_at", "artifact", "command", "every", "next_runtime", "backfill", "owner", "is_active", "schedule_delay_minutes"}),
}).Create(&wf)
ctrl.db.Unscoped().Model(&wf).Update("deleted_at", nil)
c.JSON(http.StatusOK, "OK")
Expand Down
106 changes: 89 additions & 17 deletions service/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
Deleted
DeleteFailed
Activated
Created
)

func (s ScheduleStatus) ToString() string {
Expand All @@ -46,7 +47,8 @@ func (s ScheduleStatus) ToString() string {
return "delete_failed"
case Activated:
return "activated"

case Created:
return "created"
}
panic("unknown ScheduleStatus")
}
Expand All @@ -65,6 +67,7 @@ func (s *Scheduler) Start() {
for range tick {
fmt.Println("tick")
s.scheduleWorkflows(database.GetInstance())
s.activateWorkflows(database.GetInstance())
}
}

Expand All @@ -77,7 +80,21 @@ func (s *Scheduler) scheduleWorkflows(db *gorm.DB) {
Find(&workflows)

for _, wf := range workflows {
go s.lockAndRun(db, wf)
go s.lockAndCreate(db, wf)
}
}

func (s *Scheduler) activateWorkflows(db *gorm.DB) {
var scheduledWorkflows []table.ScheduledWorkflow

db.Model(&table.ScheduledWorkflow{}).
Joins("left join workflow_activator_locks l on scheduled_workflows.id = l.scheduled_id").
Where("start_time <= ? and status = 'created' and l.token is null", time.Now()).
Order("start_time").
Find(&scheduledWorkflows)

for _, swf := range scheduledWorkflows {
go s.lockAndActivate(db, swf)
}
}

Expand Down Expand Up @@ -118,7 +135,7 @@ func (s *Scheduler) cancelWorkflows(
return updatedStatuses
}

func (s *Scheduler) createActivateWorkflow(
func (s *Scheduler) createWorkflow(
client *orchard.OrchardRestClient,
wf table.Workflow,
) map[string]string {
Expand All @@ -130,18 +147,25 @@ func (s *Scheduler) createActivateWorkflow(
return s.deleteWorkflows(client, createdIDs, statuses)
}
for _, createdID := range createdIDs {
err = client.Activate(createdID)
if err != nil {
fmt.Printf("[error] error activating workflow: %s\n", err)
notifyOwner(wf, err)
return s.cancelWorkflows(client, statuses)
}
statuses[createdID] = Activated.ToString()
statuses[createdID] = Created.ToString()
}
return statuses
}

func (s *Scheduler) lockAndRun(db *gorm.DB, wf table.Workflow) {
func (s *Scheduler) activateWorkflow(
client *orchard.OrchardRestClient,
swf table.ScheduledWorkflow,
wf table.Workflow,
) string {
if err := client.Activate(swf.OrchardID); err != nil {
fmt.Printf("[error] error activating workflow: %s\n", err)
notifyOwner(wf, err)
return swf.Status
}
return Activated.ToString()
}

func (s *Scheduler) lockAndCreate(db *gorm.DB, wf table.Workflow) {
token := uuid.New().String()

lock := table.WorkflowSchedulerLock{
Expand All @@ -151,40 +175,41 @@ func (s *Scheduler) lockAndRun(db *gorm.DB, wf table.Workflow) {
}
result := db.Create(&lock)
if result.Error != nil {
fmt.Printf("something else is running this workflow %v! skip...\n", wf.ID)
fmt.Printf("something else is creating this workflow %v! skip...\n", wf.ID)
return
}

existingLock := table.WorkflowSchedulerLock{}
db.First(&existingLock, wf.ID)

if existingLock.Token != token {
fmt.Printf("something else is running this workflow %v! skip...\n", wf.ID)
fmt.Printf("something else is creating this workflow %v! skip...\n", wf.ID)
return
}

fmt.Println("running workflow", wf.Name, token)
fmt.Println("creating workflow", wf.Name, token)
client := &orchard.OrchardRestClient{
Host: s.OrchardHost,
APIKeyName: s.OrchardAPIKeyName,
APIKey: s.OrchardAPIKey,
}

scheduleStatus := s.createActivateWorkflow(client, wf)
scheduleStatus := s.createWorkflow(client, wf)

// add to scheduled and update the next run time
db.Transaction(func(tx *gorm.DB) error {
now := time.Now()
startTime := time.Now()
for orchardID, status := range scheduleStatus {
if err := tx.Create(&table.ScheduledWorkflow{
WorkflowID: wf.ID,
OrchardID: orchardID,
StartTime: now,
StartTime: startTime,
ScheduledStartTime: wf.NextRuntime,
Status: status,
}).Error; err != nil {
return err
}
startTime = startTime.Add(time.Duration(wf.ScheduleDelayMinutes) * time.Minute)
}

fmt.Println(wf.Every)
Expand All @@ -202,6 +227,53 @@ func (s *Scheduler) lockAndRun(db *gorm.DB, wf table.Workflow) {
Delete(&table.WorkflowSchedulerLock{})
}

func (s *Scheduler) lockAndActivate(db *gorm.DB, swf table.ScheduledWorkflow) {
token := uuid.New().String()

lock := table.WorkflowActivatorLock{
swf.ID,
token,
time.Now(),
}
result := db.Create(&lock)
if result.Error != nil {
fmt.Printf("something else is activating this scheduled workflow %v! skip...\n", swf.ID)
return
}

existingLock := table.WorkflowActivatorLock{}
db.First(&existingLock, swf.ID)

if existingLock.Token != token {
fmt.Printf("something else is activating this scheduled workflow %v! skip...\n", swf.ID)
return
}

fmt.Println("activating workflow", swf.OrchardID, token)
client := &orchard.OrchardRestClient{
Host: s.OrchardHost,
APIKeyName: s.OrchardAPIKeyName,
APIKey: s.OrchardAPIKey,
}

wf := table.Workflow{}
db.First(&wf, swf.WorkflowID)

status := s.activateWorkflow(client, swf, wf)

// update status in scheduled_workflows table
db.Transaction(func(tx *gorm.DB) error {
if err := tx.Model(&swf).Update("status", status).Error; err != nil {
return err
}
return nil
})

// release the lock
db.Where("scheduled_id = ? and token = ?", swf.ID, token).
Delete(&table.WorkflowActivatorLock{})
}

func notifyOwner(wf table.Workflow, orchardErr error) {
errMsg := fmt.Sprintf(
"[error] Failed to schedule workflow %v with error %q\n",
Expand Down

0 comments on commit f342c06

Please sign in to comment.