Skip to content

Commit

Permalink
FEATURE: allow specifying on_error handler in pipeline
Browse files Browse the repository at this point in the history
this on_error handler is executed if any other step in
the pipeline failed; and can be e.g. used to trigger a
notification on error.
  • Loading branch information
skurfuerst committed Dec 22, 2023
1 parent 33759d0 commit 0b5f67c
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 17 deletions.
18 changes: 18 additions & 0 deletions definition/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func (d TaskDef) Equals(otherDef TaskDef) bool {
return true
}

// OnErrorTaskDef is a special task definition to be executed solely if an error occurs during "normal" task handling.
type OnErrorTaskDef struct {
// Script is a list of shell commands that are executed if an error occurs in a "normal" task
Script []string `yaml:"script"`

// Env sets/overrides environment variables for this task (takes precedence over pipeline environment)
Env map[string]string `yaml:"env"`
}

type PipelineDef struct {
// Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1)
Concurrency int `yaml:"concurrency"`
Expand All @@ -62,6 +71,15 @@ type PipelineDef struct {

Tasks map[string]TaskDef `yaml:"tasks"`

// Script to be executed if this pipeline fails, e.g. for notifications.
// In this script, you have the following variables set:
// - failedTaskName: Name of the failed task (key from pipelines.yml)
// - failedTaskExitCode: Exit code of the failed task
// - failedTaskError: Error message of the failed task
// - failedTaskStdout: Stdout of the failed task
// - failedTaskStderr: Stderr of the failed task
OnError OnErrorTaskDef `yaml:"onError"`

// SourcePath stores the source path where the pipeline was defined
SourcePath string
}
Expand Down
146 changes: 129 additions & 17 deletions prunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prunner
import (
"context"
"fmt"
"io"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -418,6 +419,39 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
if jt == nil {
return
}
updateJobTaskStateFromTask(jt, t)

// if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE),
// then we directly abort all other tasks of the job.
// NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only
// if one task failed, and we want to kill the other tasks.
if jt.Errored {
pipelineDef, found := r.defs.Pipelines[j.Pipeline]
if found && !pipelineDef.ContinueRunningTasksAfterFailure {
log.
WithField("component", "runner").
WithField("jobID", jobIDString).
WithField("pipeline", j.Pipeline).
WithField("failedTaskName", t.Name).
Debug("Task failed - cancelling all other tasks of the job")
// Use internal cancel since we already have a lock on the mutex
_ = r.cancelJobInternal(jobID)
}

if found && len(pipelineDef.OnError.Script) > 0 {
// we errored; and there is an onError script defined for the
// current pipeline. So let's run it.
r.runOnErrorScript(t, j, pipelineDef.OnError)
}
}

r.requestPersist()
}

// updateJobTaskStateFromTask updates jobTask properties from a given taskCtl task.Task.
// Very internal helper function, to be used in PipelineRunner.HandleTaskChange
// and PipelineRunner.runOnErrorScript.
func updateJobTaskStateFromTask(jt *jobTask, t *task.Task) {
if !t.Start.IsZero() {
start := t.Start
jt.Start = &start
Expand All @@ -437,25 +471,103 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
jt.Error = t.Error
}

// if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE),
// then we directly abort all other tasks of the job.
// NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only
// if one task failed, and we want to kill the other tasks.
if jt.Errored {
pipelineDef, found := r.defs.Pipelines[j.Pipeline]
if found && !pipelineDef.ContinueRunningTasksAfterFailure {
log.
WithField("component", "runner").
WithField("jobID", jobIDString).
WithField("pipeline", j.Pipeline).
WithField("failedTaskName", t.Name).
Debug("Task failed - cancelling all other tasks of the job")
// Use internal cancel since we already have a lock on the mutex
_ = r.cancelJobInternal(jobID)
}
}

const OnErrorTaskName = "on_error"

// runOnErrorScript is responsible for running a special "on_error" script in response to an error in the pipeline.
// It exposes variables containing information about the errored task.
//
// The method is triggered with the errored Task t, belonging to pipelineJob j; and pipelineDev
func (r *PipelineRunner) runOnErrorScript(t *task.Task, j *PipelineJob, onErrorTaskDef definition.OnErrorTaskDef) {
log.
WithField("component", "runner").
WithField("jobID", j.ID.String()).
WithField("pipeline", j.Pipeline).
WithField("failedTaskName", t.Name).
Debug("Triggering onError Script because of task failure")

rc, _ := r.outputStore.Reader(j.ID.String(), t.Name, "stdout")
defer func(rc io.ReadCloser) {
_ = rc.Close()
}(rc)
failedTaskStdout, err := io.ReadAll(rc)

Check failure on line 494 in prunner.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)

rc, _ = r.outputStore.Reader(j.ID.String(), t.Name, "stderr")
defer func(rc io.ReadCloser) {
_ = rc.Close()
}(rc)
failedTaskStderr, err := io.ReadAll(rc)

Check failure on line 500 in prunner.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)

onErrorVariables := make(map[string]interface{})
for key, value := range j.Variables {
onErrorVariables[key] = value
}

Check warning on line 505 in prunner.go

View check run for this annotation

Codecov / codecov/patch

prunner.go#L504-L505

Added lines #L504 - L505 were not covered by tests
onErrorVariables["failedTaskName"] = t.Name
onErrorVariables["failedTaskExitCode"] = t.ExitCode
onErrorVariables["failedTaskError"] = t.Error
onErrorVariables["failedTaskStdout"] = string(failedTaskStdout)
onErrorVariables["failedTaskStderr"] = string(failedTaskStderr)

onErrorJobTask := jobTask{
TaskDef: definition.TaskDef{
Script: onErrorTaskDef.Script,
// AllowFailure needs to be FALSE; otherwise lastError below won't be filled (so errors will not appear in the log)
AllowFailure: false,
Env: onErrorTaskDef.Env,
},
Name: OnErrorTaskName,
Status: toStatus(scheduler.StatusWaiting),
}

r.requestPersist()
// store on task list; so that it appears in pipeline and UI etc
j.Tasks = append(j.Tasks, onErrorJobTask)

onErrorGraph, err := buildPipelineGraph(j.ID, jobTasks{onErrorJobTask}, onErrorVariables)
if err != nil {
log.
WithError(err).
WithField("jobID", j.ID).
WithField("pipeline", j.Pipeline).
Error("Failed to build onError pipeline graph")
onErrorJobTask.Error = err
onErrorJobTask.Errored = true

// the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to
// store it again after modifying it.
j.Tasks[len(j.Tasks)-1] = onErrorJobTask
return
}

Check warning on line 540 in prunner.go

View check run for this annotation

Codecov / codecov/patch

prunner.go#L528-L540

Added lines #L528 - L540 were not covered by tests

// we use a detached taskRunner and scheduler to run the onError task, to
// run synchronously (as we are already in an async goroutine here), won't have any cycles,
// and to simplify the code.
taskRunner := r.createTaskRunner(j)
sched := taskctl.NewScheduler(taskRunner)

// Now, actually run the job synchronously
lastErr := sched.Schedule(onErrorGraph)

// Update job status as with normal jobs
onErrorJobTask.Status = toStatus(onErrorGraph.Nodes()[OnErrorTaskName].ReadStatus())
updateJobTaskStateFromTask(&onErrorJobTask, onErrorGraph.Nodes()[OnErrorTaskName].Task)

if lastErr != nil {
log.
WithError(err).
WithField("jobID", j.ID).
WithField("pipeline", j.Pipeline).
Error("Error running the onError handler")
} else {
log.
WithField("jobID", j.ID).
WithField("pipeline", j.Pipeline).
Debug("Successfully ran the onError handler")
}

// the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to
// store it again after modifying it.
j.Tasks[len(j.Tasks)-1] = onErrorJobTask
}

// HandleStageChange will be called when the stage state changes in the scheduler
Expand Down
131 changes: 131 additions & 0 deletions prunner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,137 @@ func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) {
assert.Equal(t, "from task,from pipeline,from process", string(taskVarTaskOutput), "output of task_var")
}

func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook(t *testing.T) {
var defs = &definition.PipelinesDef{
Pipelines: map[string]definition.PipelineDef{
"erroring_script": {
// Concurrency of 1 is the default for a single concurrent execution
Concurrency: 1,
QueueLimit: nil,
Tasks: map[string]definition.TaskDef{
"a": {
Script: []string{"echo A"},
},
"b": {
Script: []string{
"echo stdoutContent",
"echo This message goes to stderr >&2",
"exit 42",
},
},
"wait": {
DependsOn: []string{"a", "b"},
},
},
OnError: definition.OnErrorTaskDef{
Script: []string{
"echo ON_ERROR",
"echo 'Failed Task Name: {{ .failedTaskName }}'",
"echo 'Failed Task Exit Code: {{ .failedTaskExitCode }}'",
"echo 'Failed Task Error: {{ .failedTaskError }}'",
"echo 'Failed Task Stdout: {{ .failedTaskStdout }}'",
"echo 'Failed Task Stderr: {{ .failedTaskStderr }}'",
},
},
SourcePath: "fixtures",
},
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockOutputStore := test.NewMockOutputStore()
pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner {
// Use a real runner here to test the actual processing of a task.Task
taskRunner, _ := taskctl.NewTaskRunner(mockOutputStore)
return taskRunner
}, nil, mockOutputStore)
require.NoError(t, err)

job, err := pRunner.ScheduleAsync("erroring_script", ScheduleOpts{})
require.NoError(t, err)

waitForCompletedJob(t, pRunner, job.ID)
res := mockOutputStore.GetBytes(job.ID.String(), "on_error", "stdout")
assert.Error(t, job.LastError)
assert.Equal(t, `ON_ERROR
Failed Task Name: b
Failed Task Exit Code: 42
Failed Task Error: exit status 42
Failed Task Stdout: stdoutContent
Failed Task Stderr: This message goes to stderr
`, string(res))

jt := job.Tasks.ByName("on_error")
if assert.NotNil(t, jt) {
assert.False(t, jt.Canceled, "onError task was not marked as canceled")
assert.False(t, jt.Errored, "task was not marked as errored")
assert.Equal(t, "done", jt.Status, "task has status done")
assert.Nil(t, jt.Error, "task has no error set")
}
}

func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook_AndSetsStateCorrectlyIfErrorHookFails(t *testing.T) {
var defs = &definition.PipelinesDef{
Pipelines: map[string]definition.PipelineDef{
"erroring_script": {
// Concurrency of 1 is the default for a single concurrent execution
Concurrency: 1,
QueueLimit: nil,
Tasks: map[string]definition.TaskDef{
"a": {
Script: []string{"echo A"},
},
"b": {
Script: []string{
"echo stdoutContent",
"echo This message goes to stderr >&2",
"exit 42",
},
},
"wait": {
DependsOn: []string{"a", "b"},
},
},
OnError: definition.OnErrorTaskDef{
Script: []string{
"echo ON_ERROR",
"exit 1",
},
},
SourcePath: "fixtures",
},
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockOutputStore := test.NewMockOutputStore()
pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner {
// Use a real runner here to test the actual processing of a task.Task
taskRunner, _ := taskctl.NewTaskRunner(mockOutputStore)
return taskRunner
}, nil, mockOutputStore)
require.NoError(t, err)

job, err := pRunner.ScheduleAsync("erroring_script", ScheduleOpts{})
require.NoError(t, err)

waitForCompletedJob(t, pRunner, job.ID)
assert.Error(t, job.LastError)

jt := job.Tasks.ByName("on_error")
if assert.NotNil(t, jt) {
assert.False(t, jt.Canceled, "onError task was not marked as canceled")
assert.True(t, jt.Errored, "task was not marked as errored")
assert.Equal(t, "error", jt.Status, "task has status done")
assert.NotNil(t, jt.Error, "task has no error set")
}
}
func TestPipelineRunner_CancelJob_WithRunningJob(t *testing.T) {
var defs = &definition.PipelinesDef{
Pipelines: map[string]definition.PipelineDef{
Expand Down

0 comments on commit 0b5f67c

Please sign in to comment.