diff --git a/models/actions/run.go b/models/actions/run.go index 732fb48bb9a61..546d13106d3d8 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -46,6 +46,7 @@ type ActionRun struct { TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow Status Status `xorm:"index"` Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed + ConcurrencyGroup string // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0 Started timeutil.TimeStamp Stopped timeutil.TimeStamp @@ -195,13 +196,20 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err // It's useful when a new run is triggered, and all previous runs needn't be continued anymore. func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error { // Find all runs in the specified repository, reference, and workflow with non-final status - runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{ + opts := &FindRunOptions{ RepoID: repoID, Ref: ref, WorkflowID: workflowID, TriggerEvent: event, Status: []Status{StatusRunning, StatusWaiting, StatusBlocked}, - }) + } + return CancelPreviousJobsWithOpts(ctx, opts) +} + +// CancelPreviousJobs cancels all previous jobs with opts +func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error { + // Find all runs by opts + runs, total, err := db.FindAndCount[ActionRun](ctx, opts) if err != nil { return err } @@ -262,7 +270,7 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin // InsertRun inserts a run // The title will be cut off at 255 characters if it's longer than 255 characters. -func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error { +func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow, blockedByConcurrency bool) error { ctx, committer, err := db.TxContext(ctx) if err != nil { return err diff --git a/models/actions/run_list.go b/models/actions/run_list.go index 4046c7d369436..0cf7a371518a2 100644 --- a/models/actions/run_list.go +++ b/models/actions/run_list.go @@ -63,14 +63,16 @@ func (runs RunList) LoadRepos(ctx context.Context) error { type FindRunOptions struct { db.ListOptions - RepoID int64 - OwnerID int64 - WorkflowID string - Ref string // the commit/tag/… that caused this workflow - TriggerUserID int64 - TriggerEvent webhook_module.HookEventType - Approved bool // not util.OptionalBool, it works only when it's true - Status []Status + RepoID int64 + OwnerID int64 + WorkflowID string + Ref string // the commit/tag/… that caused this workflow + TriggerUserID int64 + TriggerEvent webhook_module.HookEventType + Approved bool // not util.OptionalBool, it works only when it's true + Status []Status + SortType string + ConcurrencyGroup string } func (opts FindRunOptions) ToConds() builder.Cond { @@ -99,11 +101,21 @@ func (opts FindRunOptions) ToConds() builder.Cond { if opts.TriggerEvent != "" { cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent}) } + if len(opts.ConcurrencyGroup) > 0 { + cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup}) + } return cond } func (opts FindRunOptions) ToOrders() string { - return "`id` DESC" + switch opts.SortType { + case "oldest": + return "created_unix ASC" + case "newest": + return "created_unix DESC" + default: + return "`id` DESC" + } } type StatusInfo struct { diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 52d10c4fe83a5..b558be8777975 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -369,6 +369,7 @@ func prepareMigrationTasks() []*migration { newMigration(309, "Improve Notification table indices", v1_23.ImproveNotificationTableIndices), newMigration(310, "Add Priority to ProtectedBranch", v1_23.AddPriorityToProtectedBranch), newMigration(311, "Add TimeEstimate to Issue table", v1_23.AddTimeEstimateColumnToIssueTable), + // TODO: add AddConcurrencyGroupToActionRun } return preparedMigrations } diff --git a/models/migrations/v1_23/v312.go b/models/migrations/v1_23/v312.go new file mode 100644 index 0000000000000..63d93b981d28d --- /dev/null +++ b/models/migrations/v1_23/v312.go @@ -0,0 +1,16 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_23 //nolint + +import ( + "xorm.io/xorm" +) + +func AddConcurrencyGroupToActionRun(x *xorm.Engine) error { + type ActionRun struct { + ConcurrencyGroup string + } + + return x.Sync(new(ActionRun)) +} diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 0f0d7d1ebdc1a..5c0e1dbe74a4c 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -893,7 +893,7 @@ func Run(ctx *context_module.Context) { } // Insert the action run and its associated jobs into the database - if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + if err := actions_model.InsertRun(ctx, run, workflows, false); err != nil { ctx.ServerError("workflow", err) return } diff --git a/services/actions/concurrency.go b/services/actions/concurrency.go new file mode 100644 index 0000000000000..59fa4e1fa3e80 --- /dev/null +++ b/services/actions/concurrency.go @@ -0,0 +1,21 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + + actions_model "code.gitea.io/gitea/models/actions" +) + +func CancelActionRunByConcurrency(ctx context.Context, run *actions_model.ActionRun) error { + return actions_model.CancelPreviousJobsWithOpts(ctx, &actions_model.FindRunOptions{ + ConcurrencyGroup: run.ConcurrencyGroup, + Status: []actions_model.Status{ + actions_model.StatusRunning, + actions_model.StatusWaiting, + actions_model.StatusBlocked, + }, + }) +} diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 1f859fcf70506..ad331486b43e8 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -37,13 +37,57 @@ func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate { ctx := graceful.GetManager().ShutdownContext() var ret []*jobUpdate for _, update := range items { - if err := checkJobsOfRun(ctx, update.RunID); err != nil { + if err := checkJobsByRunID(ctx, update.RunID); err != nil { ret = append(ret, update) } } return ret } +func checkJobsByRunID(ctx context.Context, runID int64) error { + run, exist, err := db.GetByID[actions_model.ActionRun](ctx, runID) + if err != nil { + return fmt.Errorf("get action run: %w", err) + } + if !exist { + return fmt.Errorf("action run %d does not exist", runID) + } + + return db.WithTx(ctx, func(ctx context.Context) error { + // check jobs of the current run + if err := checkJobsOfRun(ctx, runID); err != nil { + return err + } + + // check jobs by the concurrency group of the run + if len(run.ConcurrencyGroup) == 0 { + return nil + } + concurrentActionRuns, err := db.Find[actions_model.ActionRun](ctx, &actions_model.FindRunOptions{ + ConcurrencyGroup: run.ConcurrencyGroup, + Status: []actions_model.Status{ + actions_model.StatusRunning, + actions_model.StatusWaiting, + actions_model.StatusBlocked, + }, + SortType: "oldest", + }) + if err != nil { + return fmt.Errorf("find action run with concurrency group %s: %w", run.ConcurrencyGroup, err) + } + for _, cRun := range concurrentActionRuns { + if cRun.NeedApproval { + continue + } + if err := checkJobsOfRun(ctx, cRun.ID); err != nil { + return err + } + break // only run one blocked action run with the same concurrency group + } + return nil + }) +} + func checkJobsOfRun(ctx context.Context, runID int64) error { jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID}) if err != nil { diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index 30c41a435fe86..f8e4e03e7f3d3 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -332,10 +332,18 @@ func handleWorkflows( continue } + // check workflow concurrency wfGitCtx := jobparser.ToGitContext(GenerateGitContext(run, nil)) - concurrencyGroup, concurrencyCancel := jobparser.InterpolateWorkflowConcurrency(dwf.Concurrency, wfGitCtx, vars) - _, _ = concurrencyGroup, concurrencyCancel - // TODO: check concurrencyGroup and concurrencyCancel + wfConcurrencyGroup, wfConcurrencyCancel := jobparser.InterpolateWorkflowConcurrency(dwf.Concurrency, wfGitCtx, vars) + if len(wfConcurrencyGroup) > 0 { + run.ConcurrencyGroup = wfConcurrencyGroup + if wfConcurrencyCancel { + if err := CancelActionRunByConcurrency(ctx, run); err != nil { + log.Error("CancelActionRunByConcurrency: %v", err) + } + } + } + jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars)) if err != nil { log.Error("jobparser.Parse: %v", err) @@ -356,7 +364,7 @@ func handleWorkflows( } } - if err := actions_model.InsertRun(ctx, run, jobs); err != nil { + if err := actions_model.InsertRun(ctx, run, jobs, !wfConcurrencyCancel); err != nil { log.Error("InsertRun: %v", err) continue } diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go index 18f3324fd2c26..4378c165946ca 100644 --- a/services/actions/schedule_tasks.go +++ b/services/actions/schedule_tasks.go @@ -145,7 +145,7 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) } // Insert the action run and its associated jobs into the database - if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + if err := actions_model.InsertRun(ctx, run, workflows, false); err != nil { return err }