Skip to content

Commit

Permalink
Merge pull request #424
Browse files Browse the repository at this point in the history
Improve handling of failed JFR TaskRuns
  • Loading branch information
anbrsap authored Nov 28, 2023
2 parents 34199a0 + 2b31da8 commit 5e6a83c
Show file tree
Hide file tree
Showing 16 changed files with 1,643 additions and 927 deletions.
19 changes: 19 additions & 0 deletions changelog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,25 @@
- Tekton to v0.53.2
pullRequestNumber: 426

- type: bug
impact: patch
title: Recreate JFR TaskRun if pod creation failed
description: |-
The creation of the JFR pod may temporarily fail, e.g. due
to a timeout calling a mandatory admission webhook.
Steward now detects this and recreates the Tekton taskrun
to retry.
pullRequestNumber: 424

- type: bug
impact: patch
title: Stop waiting for finished non-restartable JFR TaskRun
description: |-
If a JFR TaskRun was never started, is finished and is not
restartable, Steward now fails the PipelineRun instead of
waiting until timeout.
pullRequestNumber: 424

- type: bug
impact: patch
title: Fix error detected by checkmarx tool
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.21
require (
github.com/benbjohnson/clock v1.3.5
github.com/davecgh/go-spew v1.1.1
github.com/ghodss/yaml v1.0.0
github.com/go-logr/logr v1.3.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.4.0
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,6 @@ github.com/evanphx/json-patch/v5 v5.7.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq
github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g=
github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks=
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8s/pipelineRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ func (r *pipelineRun) UpdateContainer(ctx context.Context, newContainerState *co
// StoreErrorAsMessage implements part of interface `PipelineRun`.
func (r *pipelineRun) StoreErrorAsMessage(ctx context.Context, err error, prefix string) error {
if err != nil {
text := fmt.Sprintf("ERROR: %s [%s]: %s", utils.Trim(prefix), r.String(), err.Error())
r.UpdateMessage(text)
msg := fmt.Sprintf("ERROR: %s: %s", utils.Trim(prefix), err.Error())
r.UpdateMessage(msg)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/k8s/pipelineRun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func Test_pipelineRun_StoreErrorAsMessage(t *testing.T) {
client := factory.StewardV1alpha1().PipelineRuns(ns1)
run, err = client.Get(ctx, "foo", metav1.GetOptions{})
assert.NilError(t, err)
assert.Equal(t, "ERROR: message1 [PipelineRun{name: foo, namespace: namespace1, state: running}]: error1", run.Status.Message)
assert.Equal(t, "ERROR: message1: error1", run.Status.Message)
}

func Test_pipelineRun_HasDeletionTimestamp_false(t *testing.T) {
Expand Down
11 changes: 2 additions & 9 deletions pkg/runctl/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@ import (
)

const (

// RunClusterRoleName is the name of the cluster role
// RunClusterRoleName is the name of the cluster role that
// pipeline run service accounts are bound to.
RunClusterRoleName k8s.RoleName = "steward-run"

// JFRStepName is the name of the jfs step
JFRStepName = "step-jenkinsfile-runner"

// TektonTaskRunName is the name of the Tekton TaskRun in each
// run namespace.
TektonTaskRunName = "steward-jenkinsfile-runner"
)
95 changes: 65 additions & 30 deletions pkg/runctl/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package runctl

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -83,7 +84,7 @@ type Controller struct {
}

type controllerTesting struct {
createRunManagerStub run.Manager
createRunManagerStub func(k8s.PipelineRun) run.Manager
newRunManagerStub func(k8s.ClientFactory, secrets.SecretProvider) run.Manager
loadPipelineRunsConfigStub func(ctx context.Context) (*cfg.PipelineRunsConfigStruct, error)
isMaintenanceModeStub func(ctx context.Context) (bool, error)
Expand Down Expand Up @@ -292,7 +293,7 @@ func (c *Controller) changeState(ctx context.Context, pipelineRun k8s.PipelineRu

func (c *Controller) createRunManager(pipelineRun k8s.PipelineRun) run.Manager {
if c.testing != nil && c.testing.createRunManagerStub != nil {
return c.testing.createRunManagerStub
return c.testing.createRunManagerStub(pipelineRun)
}
namespace := pipelineRun.GetNamespace()
secretsClient := c.factory.CoreV1().Secrets(namespace)
Expand All @@ -305,7 +306,7 @@ func (c *Controller) newRunManager(workFactory k8s.ClientFactory, secretProvider
return c.testing.newRunManagerStub(workFactory, secretProvider)

}
return runmgr.NewRunManager(workFactory, secretProvider)
return runmgr.NewTektonRunManager(workFactory, secretProvider)
}

func (c *Controller) loadPipelineRunsConfig(ctx context.Context) (*cfg.PipelineRunsConfigStruct, error) {
Expand Down Expand Up @@ -444,7 +445,7 @@ func (c *Controller) handlePipelineRunFinalizerAndDeletion(
if pipelineRun.HasDeletionTimestamp() {
logger.V(3).Info("Unfinished pipeline run was deleted")
runManager := c.createRunManager(pipelineRun)
err := runManager.Cleanup(ctx, pipelineRun)
err := runManager.DeleteEnv(ctx, pipelineRun)
if err != nil {
c.eventRecorder.Event(pipelineRun.GetReference(), corev1.EventTypeWarning, api.EventReasonCleaningFailed, err.Error())
return true, err
Expand Down Expand Up @@ -531,7 +532,7 @@ func (c *Controller) handlePipelineRunPrepare(
"failed to load configuration for pipeline runs",
)
}
namespace, auxNamespace, err := runManager.Prepare(ctx, pipelineRun, pipelineRunsConfig)
namespace, auxNamespace, err := runManager.CreateEnv(ctx, pipelineRun, pipelineRunsConfig)
if err != nil {
c.eventRecorder.Event(pipelineRun.GetReference(), corev1.EventTypeWarning, api.EventReasonPreparingFailed, err.Error())
resultClass := serrors.GetClass(err)
Expand Down Expand Up @@ -585,42 +586,68 @@ func (c *Controller) handlePipelineRunWaiting(
"failed to load configuration for pipeline runs",
)
}
// Check for wait timeout
startTime := pipelineRun.GetStatus().StateDetails.StartedAt
timeout := c.getWaitTimeout(pipelineRunsConfig)
if startTime.Add(timeout.Duration).Before(time.Now()) {

waitingTimeout := c.getWaitTimeout(pipelineRunsConfig)

isWaitingTimeout := func() bool {
waitingStartTime := pipelineRun.GetStatus().StateDetails.StartedAt
return waitingStartTime.Add(waitingTimeout.Duration).Before(time.Now())
}

failOnWaitingTimeout := func() (bool, error) {
err := fmt.Errorf(
"main pod has not started after %s",
timeout.Duration,
waitingTimeout.Duration,
)
return true, c.handleResultError(ctx, pipelineRun, api.ResultErrorInfra, errorMessageWaitingFailed, err)
}

if run == nil {
// recreate after deletion (restart)
return true, c.startPipelineRun(ctx, runManager, pipelineRun, pipelineRunsConfig)
} else if run.IsRestartable() {
c.eventRecorder.Event(pipelineRun.GetReference(), corev1.EventTypeWarning, api.EventReasonWaitingFailed, "restarting")
return c.restart(ctx, runManager, pipelineRun)
} else if run.IsDeleted() {
// deletion is still pending
// wait until object vanished before creating a new one with the same name
if isWaitingTimeout() {
return failOnWaitingTimeout()
}
if workqueueKey := c.getWorkqueueKey(pipelineRun.GetAPIObject()); workqueueKey != "" {
c.workqueue.AddAfter(workqueueKey, 1*time.Second)
}
return true, nil
}

startTime := run.GetStartTime()
if startTime != nil {
return true, c.changeAndCommitStateAndMeter(ctx, pipelineRun, api.StateRunning, *startTime)
}

if isWaitingTimeout() {
return failOnWaitingTimeout()
}

started := run.GetStartTime()
if started != nil {
if err := c.changeAndCommitStateAndMeter(ctx, pipelineRun, api.StateRunning, *started); err != nil {
return true, err
taskRunFinished, _ := run.IsFinished()
if taskRunFinished /* without having started */ {
if run.IsRestartable() {
c.eventRecorder.Event(pipelineRun.GetReference(), corev1.EventTypeWarning, api.EventReasonWaitingFailed, "restarting")
return c.restartPipelineRun(ctx, runManager, pipelineRun)
}
err := errors.New("failed to start task run")
return true, c.handleResultError(ctx, pipelineRun, api.ResultErrorInfra, errorMessageWaitingFailed, err)
}

// TODO return (false, nil) to continue with next phase
return true, nil
}
return false, nil
}

func (c *Controller) startPipelineRun(ctx context.Context,
func (c *Controller) startPipelineRun(
ctx context.Context,
runManager run.Manager,
pipelineRun k8s.PipelineRun,
pipelineRunsConfig *cfg.PipelineRunsConfigStruct) error {
if err := runManager.Start(ctx, pipelineRun, pipelineRunsConfig); err != nil {
pipelineRunsConfig *cfg.PipelineRunsConfigStruct,
) error {
if err := runManager.CreateRun(ctx, pipelineRun, pipelineRunsConfig); err != nil {
c.eventRecorder.Event(pipelineRun.GetReference(), corev1.EventTypeWarning, api.EventReasonWaitingFailed, err.Error())
resultClass := serrors.GetClass(err)
// In case we have a result we can cleanup. Otherwise we retry in the next iteration.
Expand All @@ -632,7 +659,7 @@ func (c *Controller) startPipelineRun(ctx context.Context,
return nil
}

func (c *Controller) restart(
func (c *Controller) restartPipelineRun(
ctx context.Context,
runManager run.Manager,
pipelineRun k8s.PipelineRun,
Expand All @@ -641,7 +668,10 @@ func (c *Controller) restart(
if serrors.IsRecoverable(err) {
return true, err
}
return true, c.handleResultError(ctx, pipelineRun, api.ResultErrorInfra, "run deletion for restart failed", err)
return true, c.handleResultError(ctx, pipelineRun, api.ResultErrorInfra, errorMessageWaitingFailed+": could not restart stuck task run", err)
}
if workqueueKey := c.getWorkqueueKey(pipelineRun.GetAPIObject()); workqueueKey != "" {
c.workqueue.AddAfter(workqueueKey, 1*time.Second)
}
return true, nil
}
Expand Down Expand Up @@ -694,7 +724,7 @@ func (c *Controller) handlePipelineRunCleaning(
if pipelineRun.GetStatus().State == api.StateCleaning {
logger.V(3).Info("Cleaning up pipeline execution")

err := runManager.Cleanup(ctx, pipelineRun)
err := runManager.DeleteEnv(ctx, pipelineRun)
if err != nil {
c.eventRecorder.Event(pipelineRun.GetReference(), corev1.EventTypeWarning, api.EventReasonCleaningFailed, err.Error())
return true, err
Expand Down Expand Up @@ -801,14 +831,19 @@ func (c *Controller) handlePipelineRunAbort(ctx context.Context, pipelineRun k8s
}

func (c *Controller) addToWorkqueue(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
if key := c.getWorkqueueKey(obj); key != "" {
c.workqueue.Add(key)
c.logger.V(4).Info("Added item to workqueue", "key", key)
}
}

func (c *Controller) getWorkqueueKey(obj interface{}) string {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(err)
return
return ""
}
c.workqueue.Add(key)
c.logger.V(4).Info("Added item to workqueue", "key", key)
return key
}

// addToWorkqueueFromAssociated takes any resource implementing metav1.Object and attempts
Expand Down
Loading

0 comments on commit 5e6a83c

Please sign in to comment.