Skip to content

Commit

Permalink
support concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Zettat123 committed Jan 15, 2025
1 parent cbf933e commit 8f5948b
Show file tree
Hide file tree
Showing 17 changed files with 1,175 additions and 282 deletions.
149 changes: 49 additions & 100 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"code.gitea.io/gitea/modules/util"
webhook_module "code.gitea.io/gitea/modules/webhook"

"github.com/nektos/act/pkg/jobparser"
"xorm.io/builder"
)

Expand All @@ -47,6 +46,8 @@ type ActionRun struct {
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
Status Status `xorm:"index"`
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
ConcurrencyGroup string
ConcurrencyCancel bool
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Expand Down Expand Up @@ -168,7 +169,7 @@ func (run *ActionRun) IsSchedule() bool {
return run.ScheduleID > 0
}

func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
_, err := db.GetEngine(ctx).ID(repo.ID).
SetExpr("num_action_runs",
builder.Select("count(*)").From("action_run").
Expand Down Expand Up @@ -222,119 +223,50 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
return err
}

// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}

// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()

// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
}

// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return fmt.Errorf("job has changed, try again")
}

// Continue with the next job.
continue
}

// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return err
}
if err := CancelJobs(ctx, jobs); err != nil {
return err
}
}

// Return nil to indicate successful cancellation of all running and waiting jobs.
return nil
}

// InsertRun inserts a run
// The title will be cut off at 255 characters if it's longer than 255 characters.
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
ctx, committer, err := db.TxContext(ctx)
if err != nil {
return err
}
defer committer.Close()

index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
if err != nil {
return err
}
run.Index = index
run.Title = util.EllipsisDisplayString(run.Title, 255)
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}

if err := db.Insert(ctx, run); err != nil {
return err
}
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()

if run.Repo == nil {
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
if err != nil {
return err
}
run.Repo = repo
}
// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
}

if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return fmt.Errorf("job has changed, try again")
}

runJobs := make([]*ActionRunJob, 0, len(jobs))
var hasWaiting bool
for _, v := range jobs {
id, job := v.Job()
needs := job.Needs()
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
return err
// Continue with the next job.
continue
}
payload, _ := v.Marshal()
status := StatusWaiting
if len(needs) > 0 || run.NeedApproval {
status = StatusBlocked
} else {
hasWaiting = true
}
job.Name = util.EllipsisDisplayString(job.Name, 255)
runJobs = append(runJobs, &ActionRunJob{
RunID: run.ID,
RepoID: run.RepoID,
OwnerID: run.OwnerID,
CommitSHA: run.CommitSHA,
IsForkPullRequest: run.IsForkPullRequest,
Name: job.Name,
WorkflowPayload: payload,
JobID: id,
Needs: needs,
RunsOn: job.RunsOn(),
Status: status,
})
}
if err := db.Insert(ctx, runJobs); err != nil {
return err
}

// if there is a job in the waiting status, increase tasks version.
if hasWaiting {
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return err
}
}

return committer.Commit()
return nil
}

func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
Expand Down Expand Up @@ -426,7 +358,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
}
run.Repo = repo
}
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}
}
Expand All @@ -435,3 +367,20 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
}

type ActionRunIndex db.ResourceIndex

func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (bool, error) {
if len(actionRun.ConcurrencyGroup) == 0 || actionRun.ConcurrencyCancel {
return false, nil
}

concurrentRunsNum, err := db.Count[ActionRun](ctx, &FindRunOptions{
RepoID: actionRun.RepoID,
ConcurrencyGroup: actionRun.ConcurrencyGroup,
Status: []Status{StatusWaiting, StatusRunning},
})
if err != nil {
return false, fmt.Errorf("count running and waiting runs: %w", err)
}

return concurrentRunsNum > 0, nil
}
111 changes: 107 additions & 4 deletions models/actions/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@ type ActionRunJob struct {
RunsOn []string `xorm:"JSON TEXT"`
TaskID int64 // the latest task of the job
Status Status `xorm:"index"`
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated index"`

RawConcurrencyGroup string // raw concurrency.group
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty
ConcurrencyGroup string // evaluated concurrency.group
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress

Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated index"`
}

func init() {
Expand Down Expand Up @@ -184,3 +191,99 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
return StatusUnknown // it shouldn't happen
}
}

func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) {
if len(job.RawConcurrencyGroup) == 0 {
return false, nil
}
if !job.IsConcurrencyEvaluated {
return false, ErrUnevaluatedConcurrency{}
}
if len(job.ConcurrencyGroup) == 0 || job.ConcurrencyCancel {
return false, nil
}

concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
RepoID: job.RepoID,
ConcurrencyGroup: job.ConcurrencyGroup,
Statuses: []Status{StatusRunning, StatusWaiting},
})
if err != nil {
return false, fmt.Errorf("count running and waiting jobs: %w", err)
}
if concurrentJobsNum > 0 {
return true, nil
}

if err := job.LoadRun(ctx); err != nil {
return false, fmt.Errorf("load run: %w", err)
}

return ShouldBlockRunByConcurrency(ctx, job.Run)
}

func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) error {
if len(job.RawConcurrencyGroup) > 0 {
if !job.IsConcurrencyEvaluated {
return ErrUnevaluatedConcurrency{}
}
if len(job.ConcurrencyGroup) > 0 && job.ConcurrencyCancel {
// cancel previous jobs in the same concurrency group
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
RepoID: job.RepoID,
ConcurrencyGroup: job.ConcurrencyGroup,
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked},
})
if err != nil {
return fmt.Errorf("find previous jobs: %w", err)
}
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
if err := CancelJobs(ctx, previousJobs); err != nil {
return fmt.Errorf("cancel previous jobs: %w", err)
}
}
}

if err := job.LoadRun(ctx); err != nil {
return fmt.Errorf("load run: %w", err)
}
if len(job.Run.ConcurrencyGroup) > 0 && job.Run.ConcurrencyCancel {
// cancel previous runs in the same concurrency group
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
RepoID: job.RepoID,
ConcurrencyGroup: job.Run.ConcurrencyGroup,
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
})
if err != nil {
return fmt.Errorf("find runs: %w", err)
}
for _, run := range runs {
if run.ID == job.Run.ID {
continue
}
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return fmt.Errorf("find run %d jobs: %w", run.ID, err)
}
if err := CancelJobs(ctx, jobs); err != nil {
return fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
}
}
}

return nil
}

type ErrUnevaluatedConcurrency struct {
}

func IsErrUnevaluatedConcurrency(err error) bool {
_, ok := err.(ErrUnevaluatedConcurrency)
return ok
}

func (err ErrUnevaluatedConcurrency) Error() string {
return "the raw concurrency group has not been evaluated"
}
22 changes: 16 additions & 6 deletions models/actions/run_job_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,21 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
return jobs.LoadRuns(ctx, withRepo)
}

func GetRunsByIDs(ctx context.Context, runIDs []int64) (RunList, error) {
runList := make(RunList, 0, len(runIDs))
err := db.GetEngine(ctx).In("id", runIDs).Find(&runList)
return runList, err
}

type FindRunJobOptions struct {
db.ListOptions
RunID int64
RepoID int64
OwnerID int64
CommitSHA string
Statuses []Status
UpdatedBefore timeutil.TimeStamp
RunID int64
RepoID int64
OwnerID int64
CommitSHA string
Statuses []Status
UpdatedBefore timeutil.TimeStamp
ConcurrencyGroup string
}

func (opts FindRunJobOptions) ToConds() builder.Cond {
Expand All @@ -76,5 +83,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
if opts.UpdatedBefore > 0 {
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
}
if opts.ConcurrencyGroup != "" {
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
}
return cond
}
20 changes: 12 additions & 8 deletions models/actions/run_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ func (runs RunList) LoadRepos(ctx context.Context) error {

type FindRunOptions struct {
db.ListOptions
RepoID int64
OwnerID int64
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
TriggerEvent webhook_module.HookEventType
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
RepoID int64
OwnerID int64
WorkflowID string
Ref string // the commit/tag/… that caused this workflow
TriggerUserID int64
TriggerEvent webhook_module.HookEventType
Approved bool // not util.OptionalBool, it works only when it's true
Status []Status
ConcurrencyGroup string
}

func (opts FindRunOptions) ToConds() builder.Cond {
Expand Down Expand Up @@ -99,6 +100,9 @@ func (opts FindRunOptions) ToConds() builder.Cond {
if opts.TriggerEvent != "" {
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
}
if len(opts.ConcurrencyGroup) > 0 {
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
}
return cond
}

Expand Down
Loading

0 comments on commit 8f5948b

Please sign in to comment.