From 870a18051f059633a7073d270222924ddee6d779 Mon Sep 17 00:00:00 2001 From: Marcus Holl Date: Tue, 17 Aug 2021 10:24:33 +0200 Subject: [PATCH] [CLOUDCIFEAT1-130] Avoid non atomic updates (#248) Up to now each and every change was "commited" directly. Now we apply several changes to the memory representation of a pipeline run and "commit" these changes together. With that there is no (short) period of time where the pipeline run is inconsistent. There are also a lot of other changes more the less related (e.g. affecting error handling) Co-authored-by: Matthias Rinck --- changelog.yaml | 17 ++ charts/steward/Chart.yaml | 4 +- pkg/apis/steward/v1alpha1/constants.go | 4 + pkg/k8s/mocks/mocks.go | 75 ++++----- pkg/k8s/pipelineRun.go | 205 +++++++++++++++---------- pkg/k8s/pipelineRun_test.go | 175 ++++++++++----------- pkg/runctl/controller.go | 193 ++++++++++++----------- pkg/runctl/controller_test.go | 25 ++- pkg/runctl/run.go | 5 + pkg/runctl/run/interfaces.go | 3 +- pkg/runctl/run/mocks/mocks.go | 22 ++- pkg/runctl/run_manager.go | 78 +++++----- pkg/runctl/run_manager_test.go | 135 +++++++++++----- 13 files changed, 548 insertions(+), 393 deletions(-) diff --git a/changelog.yaml b/changelog.yaml index 17e84370..02d274f2 100644 --- a/changelog.yaml +++ b/changelog.yaml @@ -51,12 +51,29 @@ - version: NEXT date: TBD changes: + + - type: bug + impact: minor + title: Avoid non atomic status updates + description: |- + Without this change the state might be upated e.g. to a final state + without setting a corresponding result. The result is provided a short period + of time later with an other update. In the meantime we have an invalid state. + With this change we apply both changes to the memory representation of a + pipeline run and send the update only once. With this approach there is no + short period of time with an invalid state + warning: |- + needs to be validated carefully since this is a bigger refactoring + pullRequestNumber: 248 + jiraIssueNumber: CLOUDCIFEAT1-130 + - type: bug impact: patch title: Fix deletion bug #241 description: |- When a pipeline run was deleted the state and the result were not updated in some edge cases. This is fixed now. pullRequestNumber: 250 + - version: "0.12.1" date: 2021-07-28 changes: diff --git a/charts/steward/Chart.yaml b/charts/steward/Chart.yaml index 078c952a..4c7cb54f 100644 --- a/charts/steward/Chart.yaml +++ b/charts/steward/Chart.yaml @@ -5,8 +5,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 0.12.2-dev +version: 0.13.0-dev # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 0.12.2-dev +appVersion: 0.13.0-dev diff --git a/pkg/apis/steward/v1alpha1/constants.go b/pkg/apis/steward/v1alpha1/constants.go index b7c40a49..ba445e52 100644 --- a/pkg/apis/steward/v1alpha1/constants.go +++ b/pkg/apis/steward/v1alpha1/constants.go @@ -78,6 +78,10 @@ const ( // faces an intermittent error during running phase. EventReasonRunningFailed = "RunningFailed" + // EventReasonCleaningFailed is the reason for a event occuring when the run controller + // faces an intermittent error during cleanup phase. + EventReasonCleaningFailed = "CleaningFailed" + // EventReasonLoadPipelineRunsConfigFailed is the reason for an event occuring when the // loading of the pipeline runs configuration fails. EventReasonLoadPipelineRunsConfigFailed = "LoadPipelineRunsConfigFailed" diff --git a/pkg/k8s/mocks/mocks.go b/pkg/k8s/mocks/mocks.go index 306db0a4..58cb18b8 100644 --- a/pkg/k8s/mocks/mocks.go +++ b/pkg/k8s/mocks/mocks.go @@ -35,9 +35,10 @@ import ( externalversions0 "github.com/SAP/stewardci-core/pkg/tektonclient/informers/externalversions" gomock "github.com/golang/mock/gomock" v1 "k8s.io/api/core/v1" + v10 "k8s.io/apimachinery/pkg/apis/meta/v1" dynamic "k8s.io/client-go/dynamic" - v10 "k8s.io/client-go/kubernetes/typed/core/v1" - v11 "k8s.io/client-go/kubernetes/typed/networking/v1" + v11 "k8s.io/client-go/kubernetes/typed/core/v1" + v12 "k8s.io/client-go/kubernetes/typed/networking/v1" v1beta10 "k8s.io/client-go/kubernetes/typed/rbac/v1beta1" reflect "reflect" ) @@ -66,10 +67,10 @@ func (m *MockClientFactory) EXPECT() *MockClientFactoryMockRecorder { } // CoreV1 mocks base method -func (m *MockClientFactory) CoreV1() v10.CoreV1Interface { +func (m *MockClientFactory) CoreV1() v11.CoreV1Interface { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CoreV1") - ret0, _ := ret[0].(v10.CoreV1Interface) + ret0, _ := ret[0].(v11.CoreV1Interface) return ret0 } @@ -94,10 +95,10 @@ func (mr *MockClientFactoryMockRecorder) Dynamic() *gomock.Call { } // NetworkingV1 mocks base method -func (m *MockClientFactory) NetworkingV1() v11.NetworkingV1Interface { +func (m *MockClientFactory) NetworkingV1() v12.NetworkingV1Interface { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "NetworkingV1") - ret0, _ := ret[0].(v11.NetworkingV1Interface) + ret0, _ := ret[0].(v12.NetworkingV1Interface) return ret0 } @@ -266,6 +267,21 @@ func (mr *MockPipelineRunMockRecorder) AddFinalizer() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddFinalizer", reflect.TypeOf((*MockPipelineRun)(nil).AddFinalizer)) } +// CommitStatus mocks base method +func (m *MockPipelineRun) CommitStatus() ([]*v1alpha1.StateItem, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CommitStatus") + ret0, _ := ret[0].([]*v1alpha1.StateItem) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CommitStatus indicates an expected call of CommitStatus +func (mr *MockPipelineRunMockRecorder) CommitStatus() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CommitStatus", reflect.TypeOf((*MockPipelineRun)(nil).CommitStatus)) +} + // DeleteFinalizerIfExists mocks base method func (m *MockPipelineRun) DeleteFinalizerIfExists() error { m.ctrl.T.Helper() @@ -464,11 +480,9 @@ func (mr *MockPipelineRunMockRecorder) String() *gomock.Call { } // UpdateAuxNamespace mocks base method -func (m *MockPipelineRun) UpdateAuxNamespace(arg0 string) error { +func (m *MockPipelineRun) UpdateAuxNamespace(arg0 string) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateAuxNamespace", arg0) - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "UpdateAuxNamespace", arg0) } // UpdateAuxNamespace indicates an expected call of UpdateAuxNamespace @@ -478,11 +492,9 @@ func (mr *MockPipelineRunMockRecorder) UpdateAuxNamespace(arg0 interface{}) *gom } // UpdateContainer mocks base method -func (m *MockPipelineRun) UpdateContainer(arg0 *v1.ContainerState) error { +func (m *MockPipelineRun) UpdateContainer(arg0 *v1.ContainerState) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateContainer", arg0) - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "UpdateContainer", arg0) } // UpdateContainer indicates an expected call of UpdateContainer @@ -492,11 +504,9 @@ func (mr *MockPipelineRunMockRecorder) UpdateContainer(arg0 interface{}) *gomock } // UpdateMessage mocks base method -func (m *MockPipelineRun) UpdateMessage(arg0 string) error { +func (m *MockPipelineRun) UpdateMessage(arg0 string) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateMessage", arg0) - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "UpdateMessage", arg0) } // UpdateMessage indicates an expected call of UpdateMessage @@ -506,25 +516,21 @@ func (mr *MockPipelineRunMockRecorder) UpdateMessage(arg0 interface{}) *gomock.C } // UpdateResult mocks base method -func (m *MockPipelineRun) UpdateResult(arg0 v1alpha1.Result) error { +func (m *MockPipelineRun) UpdateResult(arg0 v1alpha1.Result, arg1 v10.Time) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateResult", arg0) - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "UpdateResult", arg0, arg1) } // UpdateResult indicates an expected call of UpdateResult -func (mr *MockPipelineRunMockRecorder) UpdateResult(arg0 interface{}) *gomock.Call { +func (mr *MockPipelineRunMockRecorder) UpdateResult(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateResult", reflect.TypeOf((*MockPipelineRun)(nil).UpdateResult), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateResult", reflect.TypeOf((*MockPipelineRun)(nil).UpdateResult), arg0, arg1) } // UpdateRunNamespace mocks base method -func (m *MockPipelineRun) UpdateRunNamespace(arg0 string) error { +func (m *MockPipelineRun) UpdateRunNamespace(arg0 string) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateRunNamespace", arg0) - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "UpdateRunNamespace", arg0) } // UpdateRunNamespace indicates an expected call of UpdateRunNamespace @@ -534,18 +540,17 @@ func (mr *MockPipelineRunMockRecorder) UpdateRunNamespace(arg0 interface{}) *gom } // UpdateState mocks base method -func (m *MockPipelineRun) UpdateState(arg0 v1alpha1.State) (*v1alpha1.StateItem, error) { +func (m *MockPipelineRun) UpdateState(arg0 v1alpha1.State, arg1 v10.Time) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateState", arg0) - ret0, _ := ret[0].(*v1alpha1.StateItem) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret := m.ctrl.Call(m, "UpdateState", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 } // UpdateState indicates an expected call of UpdateState -func (mr *MockPipelineRunMockRecorder) UpdateState(arg0 interface{}) *gomock.Call { +func (mr *MockPipelineRunMockRecorder) UpdateState(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateState", reflect.TypeOf((*MockPipelineRun)(nil).UpdateState), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateState", reflect.TypeOf((*MockPipelineRun)(nil).UpdateState), arg0, arg1) } // MockPipelineRunFetcher is a mock of PipelineRunFetcher interface diff --git a/pkg/k8s/pipelineRun.go b/pkg/k8s/pipelineRun.go index 8feae70d..e3c18cab 100644 --- a/pkg/k8s/pipelineRun.go +++ b/pkg/k8s/pipelineRun.go @@ -31,23 +31,29 @@ type PipelineRun interface { GetPipelineRepoServerURL() (string, error) HasDeletionTimestamp() bool AddFinalizer() error + CommitStatus() ([]*api.StateItem, error) DeleteFinalizerIfExists() error InitState() error - UpdateState(api.State) (*api.StateItem, error) - UpdateResult(api.Result) error - UpdateContainer(*corev1.ContainerState) error + UpdateState(api.State, metav1.Time) error + UpdateResult(api.Result, metav1.Time) + UpdateContainer(*corev1.ContainerState) StoreErrorAsMessage(error, string) error - UpdateRunNamespace(string) error - UpdateAuxNamespace(string) error - UpdateMessage(string) error + UpdateRunNamespace(string) + UpdateAuxNamespace(string) + UpdateMessage(string) } type pipelineRun struct { - client stewardv1alpha1.PipelineRunInterface - apiObj *api.PipelineRun - copied bool + client stewardv1alpha1.PipelineRunInterface + apiObj *api.PipelineRun + copied bool + changes []changeFunc + commitRecorders []commitRecorderFunc } +type changeFunc func(*api.PipelineStatus) (commitRecorderFunc, error) +type commitRecorderFunc func() *api.StateItem + // NewPipelineRun creates a managed pipeline run object. // If a factory is provided a new version of the pipelinerun is fetched. // All changes are done on the fetched object. @@ -72,9 +78,11 @@ func NewPipelineRun(apiObj *api.PipelineRun, factory ClientFactory) (PipelineRun return nil, err } return &pipelineRun{ - apiObj: obj, - copied: true, - client: client, + apiObj: obj, + copied: true, + client: client, + changes: []changeFunc{}, + commitRecorders: []commitRecorderFunc{}, }, nil } @@ -141,65 +149,60 @@ func (r *pipelineRun) GetSpec() *api.PipelineSpec { func (r *pipelineRun) InitState() error { r.ensureCopy() klog.V(3).Infof("Init State [%s]", r.String()) - return r.changeStatusAndUpdateSafely(func() error { + return r.changeStatusAndStoreForRetry(func(s *api.PipelineStatus) (commitRecorderFunc, error) { - if r.apiObj.Status.State != api.StateUndefined { - return fmt.Errorf("Cannot initialize multiple times") + if s.State != api.StateUndefined { + return nil, fmt.Errorf("Cannot initialize multiple times") } newStateDetails := api.StateItem{ State: api.StateNew, StartedAt: r.apiObj.ObjectMeta.CreationTimestamp, } - r.apiObj.Status.StateDetails = newStateDetails - r.apiObj.Status.State = api.StateNew - return nil + s.StateDetails = newStateDetails + s.State = api.StateNew + return nil, nil }) } // UpdateState set end time of current (defined) state (A) and store it to the history. // if no current state is defined a new state (A) with cretiontime of the pipelinerun as start time is created. // It also creates a new current state (B) with start time. -// Returns the state details of state A -func (r *pipelineRun) UpdateState(state api.State) (*api.StateItem, error) { +func (r *pipelineRun) UpdateState(state api.State, ts metav1.Time) error { if r.apiObj.Status.State == api.StateUndefined { - return nil, fmt.Errorf("Cannot update uninitialize state") + if err := r.InitState(); err != nil { + return err + } } r.ensureCopy() klog.V(3).Infof("Update State to %s [%s]", state, r.String()) - now := metav1.Now() oldStateDetails := r.apiObj.Status.StateDetails - err := r.changeStatusAndUpdateSafely(func() error { - - currentStateDetails := r.apiObj.Status.StateDetails + return r.changeStatusAndStoreForRetry(func(s *api.PipelineStatus) (commitRecorderFunc, error) { + currentStateDetails := s.StateDetails if currentStateDetails.State != oldStateDetails.State { - return fmt.Errorf("State cannot be updated as it was changed concurrently from %q to %q", oldStateDetails.State, currentStateDetails.State) + return nil, fmt.Errorf("State cannot be updated as it was changed concurrently from %q to %q", oldStateDetails.State, currentStateDetails.State) } if state == api.StatePreparing { - r.apiObj.Status.StartedAt = &now + s.StartedAt = &ts } - currentStateDetails.FinishedAt = now - his := r.apiObj.Status.StateHistory + currentStateDetails.FinishedAt = ts + his := s.StateHistory his = append(his, currentStateDetails) - newStateDetails := api.StateItem{State: state, StartedAt: now} + commitRecorderFunc := func() *api.StateItem { + return ¤tStateDetails + } + newStateDetails := api.StateItem{State: state, StartedAt: ts} if state == api.StateFinished { - newStateDetails.FinishedAt = now + newStateDetails.FinishedAt = ts } - r.apiObj.Status.StateDetails = newStateDetails - r.apiObj.Status.StateHistory = his - r.apiObj.Status.State = state - return nil + s.StateDetails = newStateDetails + s.StateHistory = his + s.State = state + return commitRecorderFunc, nil }) - - if err != nil { - return nil, err - } - his := r.apiObj.Status.StateHistory - hisLen := len(his) - return &his[hisLen-1], nil } // String returns the full qualified name of the pipeline run @@ -208,25 +211,24 @@ func (r *pipelineRun) String() string { } // UpdateResult of the pipeline run -func (r *pipelineRun) UpdateResult(result api.Result) error { +func (r *pipelineRun) UpdateResult(result api.Result, ts metav1.Time) { r.ensureCopy() - return r.changeStatusAndUpdateSafely(func() error { - r.apiObj.Status.Result = result - now := metav1.Now() - r.apiObj.Status.FinishedAt = &now - return nil + r.mustChangeStatusAndStoreForRetry(func(s *api.PipelineStatus) (commitRecorderFunc, error) { + s.Result = result + s.FinishedAt = &ts + return nil, nil }) } // UpdateContainer ... -func (r *pipelineRun) UpdateContainer(c *corev1.ContainerState) error { +func (r *pipelineRun) UpdateContainer(c *corev1.ContainerState) { if c == nil { - return nil + return } r.ensureCopy() - return r.changeStatusAndUpdateSafely(func() error { - r.apiObj.Status.Container = *c - return nil + r.mustChangeStatusAndStoreForRetry(func(s *api.PipelineStatus) (commitRecorderFunc, error) { + s.Container = *c + return nil, nil }) } @@ -235,44 +237,44 @@ func (r *pipelineRun) StoreErrorAsMessage(err error, message string) error { if err != nil { text := fmt.Sprintf("ERROR: %s [%s]: %s", utils.Trim(message), r.String(), err.Error()) klog.V(3).Infof(text) - return r.UpdateMessage(text) + r.UpdateMessage(text) } return nil } // UpdateMessage stores string as message in the status -func (r *pipelineRun) UpdateMessage(message string) error { +func (r *pipelineRun) UpdateMessage(message string) { r.ensureCopy() - return r.changeStatusAndUpdateSafely(func() error { - old := r.apiObj.Status.Message + r.mustChangeStatusAndStoreForRetry(func(s *api.PipelineStatus) (commitRecorderFunc, error) { + old := s.Message if old != "" { - his := r.apiObj.Status.History + his := s.History his = append(his, old) - r.apiObj.Status.History = his + s.History = his } - r.apiObj.Status.Message = utils.Trim(message) - r.apiObj.Status.MessageShort = utils.ShortenMessage(message, 100) - return nil + s.Message = utils.Trim(message) + s.MessageShort = utils.ShortenMessage(message, 100) + return nil, nil }) } // UpdateRunNamespace overrides the namespace in which the builds happens -func (r *pipelineRun) UpdateRunNamespace(ns string) error { +func (r *pipelineRun) UpdateRunNamespace(ns string) { r.ensureCopy() - return r.changeStatusAndUpdateSafely(func() error { - r.apiObj.Status.Namespace = ns - return nil + r.mustChangeStatusAndStoreForRetry(func(s *api.PipelineStatus) (commitRecorderFunc, error) { + s.Namespace = ns + return nil, nil }) } // UpdateAuxNamespace overrides the namespace hosting auxiliary services // for the pipeline run. -func (r *pipelineRun) UpdateAuxNamespace(ns string) error { +func (r *pipelineRun) UpdateAuxNamespace(ns string) { r.ensureCopy() - return r.changeStatusAndUpdateSafely(func() error { - r.apiObj.Status.AuxiliaryNamespace = ns - return nil + r.mustChangeStatusAndStoreForRetry(func(s *api.PipelineStatus) (commitRecorderFunc, error) { + s.AuxiliaryNamespace = ns + return nil, nil }) } @@ -301,6 +303,9 @@ func (r *pipelineRun) DeleteFinalizerIfExists() error { } func (r *pipelineRun) updateFinalizers(finalizerList []string) error { + if len(r.changes) > 0 { + return fmt.Errorf("cannot add finalizers when we have uncommited status updates") + } if r.client == nil { panic(fmt.Errorf("No factory provided to store updates [%s]", r.String())) } @@ -319,7 +324,31 @@ func (r *pipelineRun) updateFinalizers(finalizerList []string) error { return nil } -// changeStatusAndUpdateSafely executes `change` and writes the +// mustChangeStatusAndStoreForRetry calls changeStatusAndStoreForRetry and +// panics in case of an error. +func (r *pipelineRun) mustChangeStatusAndStoreForRetry(change changeFunc) { + err := r.changeStatusAndStoreForRetry((change)) + if err != nil { + panic(err) + } +} + +// changeStatusAndStoreForRetry receives a function applying changes to pipelinerun.Status +// This function get executed on the current memory representation of the pipeline run +// and remembered so that it can be re-applied later in case of a re-try. The change function +// must only apply changes to pipelinerun.Status. +// +func (r *pipelineRun) changeStatusAndStoreForRetry(change changeFunc) error { + commitRecorder, err := change(r.GetStatus()) + if err == nil { + r.changes = append(r.changes, change) + r.commitRecorders = append(r.commitRecorders, commitRecorder) + } + + return err +} + +// CommitStatus executes `change` and writes the // status of the underlying PipelineRun object to storage afterwards. // `change` is expected to mutate only the status of the underlying // object, not more. @@ -338,13 +367,17 @@ func (r *pipelineRun) updateFinalizers(finalizerList []string) error { // that change _gets_ persisted in case there's _no_ update conflict, but // gets _lost_ in case there _is_ an update conflict! This is hard to find // by tests, as those typically do not encounter update conflicts. -func (r *pipelineRun) changeStatusAndUpdateSafely(change func() error) error { +func (r *pipelineRun) CommitStatus() ([]*api.StateItem, error) { if r.client == nil { panic(fmt.Errorf("No factory provided to store updates [%s]", r.String())) } + if len(r.changes) == 0 { + return nil, nil + } + isRetry := false - var changeError error = nil + var changeError error err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { var err error @@ -356,15 +389,19 @@ func (r *pipelineRun) changeStatusAndUpdateSafely(change func() error) error { } r.apiObj = new r.copied = true + r.commitRecorders = []commitRecorderFunc{} + var commitRecorder func() *api.StateItem + for _, change := range r.changes { + commitRecorder, changeError = change(r.GetStatus()) + if changeError != nil { + return nil + } + r.commitRecorders = append(r.commitRecorders, commitRecorder) + } } else { defer func() { isRetry = true }() } - changeError = change() - if changeError != nil { - return nil - } - result, err := r.client.UpdateStatus(r.apiObj) if err == nil { r.apiObj = result @@ -372,11 +409,17 @@ func (r *pipelineRun) changeStatusAndUpdateSafely(change func() error) error { } return err }) + r.changes = []changeFunc{} if changeError != nil { - return changeError + return nil, changeError } - - return errors.Wrapf(err, "failed to update status [%s]", r.String()) + result := []*api.StateItem{} + for _, recorder := range r.commitRecorders { + if recorder != nil { + result = append(result, recorder()) + } + } + return result, errors.Wrapf(err, "failed to update status [%s]", r.String()) } func (r *pipelineRun) ensureCopy() { diff --git a/pkg/k8s/pipelineRun_test.go b/pkg/k8s/pipelineRun_test.go index 72cb86ad..4f3a46fe 100644 --- a/pkg/k8s/pipelineRun_test.go +++ b/pkg/k8s/pipelineRun_test.go @@ -96,7 +96,7 @@ func Test_NewPipelineRun_IsCopy(t *testing.T) { // VERIFY assert.NilError(t, err) - examinee.UpdateResult(api.ResultSuccess) + examinee.UpdateResult(api.ResultSuccess, metav1.Now()) assert.Equal(t, api.ResultUndefined, run.Status.Result) assert.Equal(t, api.ResultSuccess, examinee.GetStatus().Result) } @@ -147,6 +147,7 @@ func Test_pipelineRun_StoreErrorAsMessage(t *testing.T) { // EXERCISE examinee.StoreErrorAsMessage(errorToStore, message) + examinee.CommitStatus() // VERIFY client := factory.StewardV1alpha1().PipelineRuns(ns1) @@ -232,16 +233,6 @@ func Test_pipelineRun_InitState(t *testing.T) { func Test_pipelineRun_InitState_ReturnsErrorIfCalledMultipleTimes(t *testing.T) { t.Parallel() - // SETUP - pipelineRun := newPipelineRunWithEmptySpec(ns1, run1) - creationTimestamp := metav1.Now() - pipelineRun.ObjectMeta.CreationTimestamp = creationTimestamp - factory := fake.NewClientFactory(pipelineRun) - examinee, err := NewPipelineRun(pipelineRun, factory) - assert.NilError(t, err) - resultErr := examinee.InitState() - assert.NilError(t, resultErr) - for _, oldState := range []api.State{ api.StateNew, api.StatePreparing, @@ -250,19 +241,33 @@ func Test_pipelineRun_InitState_ReturnsErrorIfCalledMultipleTimes(t *testing.T) api.StateCleaning, api.StateFinished, } { - _, resultErr = examinee.UpdateState(oldState) - assert.NilError(t, resultErr) - // EXERCISE - resultErr = examinee.InitState() + t.Run(string(oldState), func(t *testing.T) { + // SETUP + pipelineRun := newPipelineRunWithEmptySpec(ns1, run1) + creationTimestamp := metav1.Now() + pipelineRun.ObjectMeta.CreationTimestamp = creationTimestamp + factory := fake.NewClientFactory(pipelineRun) + + examinee, err := NewPipelineRun(pipelineRun, factory) + assert.NilError(t, err) + resultErr := examinee.InitState() + assert.NilError(t, resultErr) + + resultErr = examinee.UpdateState(oldState, metav1.Now()) + assert.NilError(t, resultErr) + + // EXERCISE + resultErr = examinee.InitState() - // VERIFY - assert.Error(t, resultErr, "Cannot initialize multiple times") - assert.Equal(t, oldState, examinee.GetStatus().State) + // VERIFY + assert.Error(t, resultErr, "Cannot initialize multiple times") + assert.Equal(t, oldState, examinee.GetStatus().State) + }) } } -func Test_pipelineRun_UpdateState_FailsWithoutInit(t *testing.T) { +func Test_pipelineRun_UpdateState_HasAutomaticInitialization(t *testing.T) { t.Parallel() // SETUP @@ -272,10 +277,21 @@ func Test_pipelineRun_UpdateState_FailsWithoutInit(t *testing.T) { assert.NilError(t, err) // EXERCISE - _, resultErr := examinee.UpdateState(api.StatePreparing) + resultErr := examinee.UpdateState(api.StatePreparing, metav1.Now()) + assert.NilError(t, resultErr) + results, resultErr := examinee.CommitStatus() // VERIFY - assert.Error(t, resultErr, "Cannot update uninitialize state") + assert.NilError(t, resultErr) + + assert.Equal(t, api.StatePreparing, examinee.GetStatus().State) + assert.Equal(t, 1, len(examinee.GetStatus().StateHistory)) + assert.Equal(t, api.StateNew, examinee.GetStatus().StateHistory[0].State) + startedAt := examinee.GetStatus().StartedAt + assert.Assert(t, !startedAt.IsZero()) + assert.Equal(t, *startedAt, examinee.GetStatus().StateHistory[0].FinishedAt) + assert.Equal(t, api.StateNew, results[0].State) + assert.Equal(t, *startedAt, results[0].FinishedAt) } func Test_pipelineRun_UpdateState_AfterFirstCall(t *testing.T) { @@ -292,7 +308,9 @@ func Test_pipelineRun_UpdateState_AfterFirstCall(t *testing.T) { assert.NilError(t, err) // EXERCISE - result, resultErr := examinee.UpdateState(api.StatePreparing) + resultErr := examinee.UpdateState(api.StatePreparing, metav1.Now()) + assert.NilError(t, resultErr) + results, resultErr := examinee.CommitStatus() // VERIFY assert.NilError(t, resultErr) @@ -305,9 +323,9 @@ func Test_pipelineRun_UpdateState_AfterFirstCall(t *testing.T) { assert.Assert(t, !startedAt.IsZero()) assert.Equal(t, *startedAt, examinee.GetStatus().StateHistory[0].FinishedAt) - assert.Equal(t, api.StateNew, result.State) - assert.Equal(t, creationTimestamp, result.StartedAt) - assert.Equal(t, *startedAt, result.FinishedAt) + assert.Equal(t, api.StateNew, results[0].State) + assert.Equal(t, creationTimestamp, results[0].StartedAt) + assert.Equal(t, *startedAt, results[0].FinishedAt) } func Test_pipelineRun_UpdateState_AfterSecondCall(t *testing.T) { @@ -320,12 +338,14 @@ func Test_pipelineRun_UpdateState_AfterSecondCall(t *testing.T) { assert.NilError(t, err) err = examinee.InitState() assert.NilError(t, err) - _, err = examinee.UpdateState(api.StatePreparing) // first call + err = examinee.UpdateState(api.StatePreparing, metav1.Now()) // first call assert.NilError(t, err) factory.Sleep("let time elapse to check timestamps afterwards") // EXERCISE - result, resultErr := examinee.UpdateState(api.StateRunning) // second call + resultErr := examinee.UpdateState(api.StateRunning, metav1.Now()) // second call + assert.NilError(t, resultErr) + results, resultErr := examinee.CommitStatus() // VERIFY assert.NilError(t, resultErr) @@ -339,9 +359,10 @@ func Test_pipelineRun_UpdateState_AfterSecondCall(t *testing.T) { assert.Assert(t, !start.IsZero()) assert.Assert(t, factory.CheckTimeOrder(start, end)) - assert.Equal(t, api.StatePreparing, result.State) - start = result.StartedAt - end = result.FinishedAt + assert.Equal(t, api.StateNew, results[0].State) + assert.Equal(t, api.StatePreparing, results[1].State) + start = results[1].StartedAt + end = results[1].FinishedAt assert.Assert(t, !start.IsZero()) assert.Assert(t, factory.CheckTimeOrder(start, end)) @@ -357,12 +378,14 @@ func Test_pipelineRun_UpdateStateToFinished_HistoryIfUpdateStateCalledBefore(t * assert.NilError(t, err) err = examinee.InitState() assert.NilError(t, err) - _, err = examinee.UpdateState(api.StatePreparing) // called before + err = examinee.UpdateState(api.StatePreparing, metav1.Now()) // called before assert.NilError(t, err) factory.Sleep("let time elapse to check timestamps afterwards") // EXERCISE - examinee.UpdateState(api.StateFinished) + examinee.UpdateState(api.StateFinished, metav1.Now()) + _, err = examinee.CommitStatus() + assert.NilError(t, err) // VERIFY status := examinee.GetStatus() @@ -388,30 +411,13 @@ func Test_pipelineRun_UpdateResult(t *testing.T) { assert.Assert(t, examinee.GetStatus().FinishedAt.IsZero()) // EXERCISE - examinee.UpdateResult(api.ResultSuccess) + examinee.UpdateResult(api.ResultSuccess, metav1.Now()) // VERIFY status := examinee.GetStatus() assert.Equal(t, api.ResultSuccess, status.Result) assert.Assert(t, !examinee.GetStatus().FinishedAt.IsZero()) } - -func Test_pipelineRun_UpdateResult_PanicsIfNoClientFactory(t *testing.T) { - t.Parallel() - - // SETUP - pipelineRun := newPipelineRunWithEmptySpec(ns1, run1) - examinee, err := NewPipelineRun(pipelineRun, nil /* client factory */) - assert.NilError(t, err) - - // EXERCISE and VERIFY - assert.Assert(t, cmp.Panics( - func() { - examinee.UpdateResult(api.ResultSuccess) - }, - )) -} - func Test_pipelineRun_GetPipelineRepoServerURL_CorrectURLs(t *testing.T) { t.Parallel() @@ -482,13 +488,14 @@ func Test_pipelineRun_UpdateState_PropagatesError(t *testing.T) { factory.StewardClientset().PrependReactor("update", "*", fake.NewErrorReactor(expectedError)) // EXCERCISE - _, err = examinee.UpdateState(api.StateWaiting) + examinee.UpdateState(api.StateWaiting, metav1.Now()) + _, err = examinee.CommitStatus() // VERIFY assert.Assert(t, err != nil) } -func Test_pipelineRun_UpdateState_RetriesOnConflict(t *testing.T) { +func Test_pipelineRun_CommitStatus_RetriesOnConflict(t *testing.T) { t.Parallel() // SETUP @@ -511,9 +518,11 @@ func Test_pipelineRun_UpdateState_RetriesOnConflict(t *testing.T) { assert.NilError(t, err) err = examinee.InitState() assert.NilError(t, err) + resultErr := examinee.UpdateState(api.StateWaiting, metav1.Now()) + assert.NilError(t, resultErr) // EXCERCISE - _, resultErr := examinee.UpdateState(api.StateWaiting) + _, resultErr = examinee.CommitStatus() // VERIFY assert.NilError(t, resultErr) @@ -521,26 +530,6 @@ func Test_pipelineRun_UpdateState_RetriesOnConflict(t *testing.T) { assert.Assert(t, count == 3) } -func Test_pipelineRun_changeStatusAndUpdateSafely_PanicsIfNoClientFactory(t *testing.T) { - t.Parallel() - - // SETUP - run := newPipelineRunWithEmptySpec(ns1, run1) - examinee, err := NewPipelineRun(run, nil /* client factory */) - assert.NilError(t, err) - - // EXERCISE and VERIFY - assert.Assert(t, cmp.Panics( - func() { - changeFunc := func() error { - return nil - } - - examinee.(*pipelineRun).changeStatusAndUpdateSafely(changeFunc) - }, - )) -} - func Test_pipelineRun_changeStatusAndUpdateSafely_SetsUpdateResult_IfNoConflict(t *testing.T) { t.Parallel() @@ -563,13 +552,14 @@ func Test_pipelineRun_changeStatusAndUpdateSafely_SetsUpdateResult_IfNoConflict( } changeCallCount := 0 - changeFunc := func() error { + changeFunc := func(*api.PipelineStatus) (commitRecorderFunc, error) { changeCallCount++ - return nil + return nil, nil } // EXCERCISE - resultErr := examinee.changeStatusAndUpdateSafely(changeFunc) + examinee.changeStatusAndStoreForRetry(changeFunc) + _, resultErr := examinee.CommitStatus() // VERIFY assert.NilError(t, resultErr) @@ -594,9 +584,9 @@ func Test_pipelineRun_changeStatusAndUpdateSafely_NoUpdateOnChangeErrorInFirstAt changeError := fmt.Errorf("ChangeError1") changeCallCount := 0 - changeFunc := func() error { + changeFunc := func(*api.PipelineStatus) (commitRecorderFunc, error) { changeCallCount++ - return changeError + return nil, changeError } examinee := &pipelineRun{ @@ -606,7 +596,7 @@ func Test_pipelineRun_changeStatusAndUpdateSafely_NoUpdateOnChangeErrorInFirstAt } // EXCERCISE - resultErr := examinee.changeStatusAndUpdateSafely(changeFunc) + resultErr := examinee.changeStatusAndStoreForRetry(changeFunc) // VERIFY assert.Error(t, resultErr, changeError.Error()) @@ -640,13 +630,14 @@ func Test_pipelineRun_changeStatusAndUpdateSafely_SetsUpdateResult_IfConflict(t } changeCallCount := 0 - changeFunc := func() error { + changeFunc := func(*api.PipelineStatus) (commitRecorderFunc, error) { changeCallCount++ - return nil + return nil, nil } // EXCERCISE - resultErr := examinee.changeStatusAndUpdateSafely(changeFunc) + examinee.changeStatusAndStoreForRetry(changeFunc) + _, resultErr := examinee.CommitStatus() // VERIFY assert.NilError(t, resultErr) @@ -684,13 +675,14 @@ func Test_pipelineRun_changeStatusAndUpdateSafely_FailsAfterTooManyConflicts(t * } changeCallCount := 0 - changeFunc := func() error { + changeFunc := func(*api.PipelineStatus) (commitRecorderFunc, error) { changeCallCount++ - return nil + return nil, nil } // EXCERCISE - resultErr := examinee.changeStatusAndUpdateSafely(changeFunc) + examinee.changeStatusAndStoreForRetry(changeFunc) + _, resultErr := examinee.CommitStatus() // VERIFY assert.Assert(t, errors.Is(resultErr, errorOnUpdate)) @@ -729,13 +721,14 @@ func Test_pipelineRun_changeStatusAndUpdateSafely_ReturnsErrorIfFetchFailed(t *t } changeCallCount := 0 - changeFunc := func() error { + changeFunc := func(*api.PipelineStatus) (commitRecorderFunc, error) { changeCallCount++ - return nil + return nil, nil } // EXCERCISE - resultErr := examinee.changeStatusAndUpdateSafely(changeFunc) + examinee.changeStatusAndStoreForRetry(changeFunc) + _, resultErr := examinee.CommitStatus() // VERIFY assert.Assert(t, errors.Is(resultErr, errorOnGet)) @@ -745,18 +738,20 @@ func Test_pipelineRun_changeStatusAndUpdateSafely_ReturnsErrorIfFetchFailed(t *t assert.Equal(t, changeCallCount, 1) } -func Test_pipelineRun_updateFinalizers_PanicsIfNoClientFactory(t *testing.T) { +func Test_pipelineRun_CommitStatus_PanicsIfNoClientFactory(t *testing.T) { t.Parallel() // SETUP run := newPipelineRunWithEmptySpec(ns1, run1) examinee, err := NewPipelineRun(run, nil /* client factory */) assert.NilError(t, err) + examinee2 := examinee.(*pipelineRun) + examinee2.changeStatusAndStoreForRetry(func(*api.PipelineStatus) (commitRecorderFunc, error) { /* foo */ return nil, nil }) // EXERCISE and VERIFY assert.Assert(t, cmp.Panics( func() { - examinee.(*pipelineRun).updateFinalizers([]string{"dummy"}) + examinee2.CommitStatus() }, )) } diff --git a/pkg/runctl/controller.go b/pkg/runctl/controller.go index de16d193..00957656 100644 --- a/pkg/runctl/controller.go +++ b/pkg/runctl/controller.go @@ -34,7 +34,7 @@ const kind = "PipelineRuns" // Used for logging (control loop) "still alive" messages var heartbeatIntervalSeconds int64 = 60 -var heartbeatTimer int64 = 0 +var heartbeatTimer int64 // Interval for histogram creation set to prometheus default scrape interval var meteringInterval = time.Minute * 1 @@ -205,24 +205,13 @@ func (c *Controller) processNextWorkItem() bool { return true } -func (c *Controller) changeState(pipelineRun k8s.PipelineRun, state api.State) error { - start := time.Now() - oldState, err := pipelineRun.UpdateState(state) +func (c *Controller) changeState(pipelineRun k8s.PipelineRun, state api.State, ts metav1.Time) error { + err := pipelineRun.UpdateState(state, ts) if err != nil { klog.V(3).Infof("Failed to UpdateState of [%s] to %q: %q", pipelineRun.String(), state, err.Error()) return err } - end := time.Now() - elapsed := end.Sub(start) - c.metrics.ObserveUpdateDurationByType("UpdateState", elapsed) - - if oldState != nil { - err := c.metrics.ObserveDurationByState(oldState) - if err != nil { - klog.Errorf("Failed to measure state '%+v': '%s'", oldState, err) - } - } return nil } @@ -270,7 +259,7 @@ func (c *Controller) syncHandler(key string) error { if pipelineRunAPIObj == nil { return nil } - // fast exit + // fast exit - no finalizer cleanup needed if pipelineRunAPIObj.Status.State == api.StateFinished && !utils.StringSliceContains(pipelineRunAPIObj.ObjectMeta.Finalizers, k8s.FinalizerName) { return nil } @@ -286,39 +275,24 @@ func (c *Controller) syncHandler(key string) error { return nil } - // Init state when undefined - if pipelineRun.GetStatus().State == api.StateUndefined { - err = pipelineRun.InitState() - if err != nil { - return err - } + // fast exit with finalizer cleanup + if pipelineRun.GetStatus().State == api.StateFinished { + return pipelineRun.DeleteFinalizerIfExists() } - // Check if object has deletion timestamp - // If not, try to add finalizer if missing + // Check if object has deletion timestamp ... if pipelineRun.HasDeletionTimestamp() { runManager := c.createRunManager(pipelineRun) - if pipelineRun.GetStatus().State == api.StateFinished { - return pipelineRun.DeleteFinalizerIfExists() - } err = runManager.Cleanup(pipelineRun) - - if err == nil { - pipelineRun.UpdateResult(api.ResultDeleted) - err = c.finish(pipelineRun) - if err == nil { - c.metrics.CountResult(api.ResultDeleted) - } + if err != nil { + c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonCleaningFailed, err.Error()) + return err } - return err + return c.updateStateAndResult(pipelineRun, api.StateFinished, api.ResultDeleted, metav1.Now()) } + // ... if not, try to add finalizer if missing pipelineRun.AddFinalizer() - // Finished and no deletion timestamp, no need to process anything further - if pipelineRun.GetStatus().State == api.StateFinished { - return nil - } - // Check if pipeline run is aborted if err := c.handleAborted(pipelineRun); err != nil { return err @@ -326,12 +300,21 @@ func (c *Controller) syncHandler(key string) error { // As soon as we have a result we can cleanup if pipelineRun.GetStatus().Result != api.ResultUndefined && pipelineRun.GetStatus().State != api.StateCleaning { - err = c.changeState(pipelineRun, api.StateCleaning) + err = c.changeState(pipelineRun, api.StateCleaning, metav1.Now()) if err != nil { klog.V(1).Infof("WARN: change state to cleaning failed with: %s", err.Error()) } } + // Init state when undefined + if pipelineRun.GetStatus().State == api.StateUndefined { + + err = pipelineRun.InitState() + if err != nil { + return err + } + } + if pipelineRun.GetStatus().State == api.StateNew { maintenanceMode, err := c.isMaintenanceMode() if err != nil { @@ -343,7 +326,7 @@ func (c *Controller) syncHandler(key string) error { // Return error that the pipeline stays in the queue and will be processed after switching back to normal mode. return err } - if err = c.changeState(pipelineRun, api.StatePreparing); err != nil { + if err = c.changeAndCommitStateAndMeter(pipelineRun, api.StatePreparing, metav1.Now()); err != nil { return err } c.metrics.CountStart() @@ -351,103 +334,118 @@ func (c *Controller) syncHandler(key string) error { runManager := c.createRunManager(pipelineRun) - // the configuration should be loaded once per sync to avoid inconsistencies - // in case of concurrent configuration changes - pipelineRunsConfig, err := c.loadPipelineRunsConfig() - if err != nil { - if serrors.IsRecoverable(err) { - c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonLoadPipelineRunsConfigFailed, err.Error()) - return err - } - pipelineRun.UpdateResult(api.ResultErrorInfra) - pipelineRun.StoreErrorAsMessage(err, "failed to load configuration for pipeline runs") - c.metrics.CountResult(pipelineRun.GetStatus().Result) - runManager.Cleanup(pipelineRun) - return c.finish(pipelineRun) - } - // Process pipeline run based on current state switch state := pipelineRun.GetStatus().State; state { case api.StatePreparing: - err = runManager.Start(pipelineRun, pipelineRunsConfig) + // the configuration should be loaded once per sync to avoid inconsistencies + // in case of concurrent configuration changes + pipelineRunsConfig, err := c.loadPipelineRunsConfig() + if err != nil { + return c.onGetRunError(pipelineRunAPIObj, pipelineRun, err, api.StateFinished, api.ResultErrorInfra, "failed to load configuration for pipeline runs") + } + namespace, auxNamespace, err := runManager.Start(pipelineRun, pipelineRunsConfig) if err != nil { c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonPreparingFailed, err.Error()) resultClass := serrors.GetClass(err) // In case we have a result we can cleanup. Otherwise we retry in the next iteration. if resultClass != api.ResultUndefined { pipelineRun.UpdateMessage(err.Error()) - pipelineRun.UpdateResult(resultClass) - if errClean := c.changeState(pipelineRun, api.StateCleaning); errClean != nil { - return errClean - } pipelineRun.StoreErrorAsMessage(err, "preparing failed") - c.metrics.CountResult(pipelineRun.GetStatus().Result) - return nil + return c.updateStateAndResult(pipelineRun, api.StateCleaning, resultClass, metav1.Now()) } return err } - if err = c.changeState(pipelineRun, api.StateWaiting); err != nil { + + pipelineRun.UpdateRunNamespace(namespace) + pipelineRun.UpdateAuxNamespace(auxNamespace) + + if err = c.changeAndCommitStateAndMeter(pipelineRun, api.StateWaiting, metav1.Now()); err != nil { return err } case api.StateWaiting: run, err := runManager.GetRun(pipelineRun) if err != nil { - c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonWaitingFailed, err.Error()) - if serrors.IsRecoverable(err) { - return err - } - if errClean := c.changeState(pipelineRun, api.StateCleaning); errClean != nil { - return errClean - } - pipelineRun.StoreErrorAsMessage(err, "waiting failed") - pipelineRun.UpdateResult(api.ResultErrorInfra) - c.metrics.CountResult(api.ResultErrorInfra) - return nil + return c.onGetRunError(pipelineRunAPIObj, pipelineRun, err, api.StateCleaning, api.ResultErrorInfra, "waiting failed") } started := run.GetStartTime() if started != nil { - if err = c.changeState(pipelineRun, api.StateRunning); err != nil { + if err := c.changeAndCommitStateAndMeter(pipelineRun, api.StateRunning, *started); err != nil { return err } } case api.StateRunning: run, err := runManager.GetRun(pipelineRun) if err != nil { - c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonRunningFailed, err.Error()) - if serrors.IsRecoverable(err) { - return err - } - if errClean := c.changeState(pipelineRun, api.StateCleaning); errClean != nil { - return errClean - } - pipelineRun.StoreErrorAsMessage(err, "running failed") - return nil + return c.onGetRunError(pipelineRunAPIObj, pipelineRun, err, api.StateCleaning, api.ResultErrorInfra, "running failed") } containerInfo := run.GetContainerInfo() pipelineRun.UpdateContainer(containerInfo) if finished, result := run.IsFinished(); finished { - msg := run.GetMessage() - pipelineRun.UpdateMessage(msg) - pipelineRun.UpdateResult(result) - if err = c.changeState(pipelineRun, api.StateCleaning); err != nil { - return err - } - c.metrics.CountResult(result) + pipelineRun.UpdateMessage(run.GetMessage()) + return c.updateStateAndResult(pipelineRun, api.StateCleaning, result, *run.GetCompletionTime()) } + // commit container update + c.commitStatusAndMeter(pipelineRun) + case api.StateCleaning: err = runManager.Cleanup(pipelineRun) - return c.finish(pipelineRun) + if err != nil { + c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonCleaningFailed, err.Error()) + } + if err := c.changeAndCommitStateAndMeter(pipelineRun, api.StateFinished, metav1.Now()); err != nil { + return err + } + return pipelineRun.DeleteFinalizerIfExists() default: klog.V(2).Infof("Skip PipelineRun with state %s", pipelineRun.GetStatus().State) } return nil } -func (c *Controller) finish(pipelineRun k8s.PipelineRun) error { - if err := c.changeState(pipelineRun, api.StateFinished); err != nil { +func (c *Controller) onGetRunError(pipelineRunAPIObj *api.PipelineRun, pipelineRun k8s.PipelineRun, err error, state api.State, result api.Result, message string) error { + c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonRunningFailed, err.Error()) + if serrors.IsRecoverable(err) { + return err + } + pipelineRun.StoreErrorAsMessage(err, message) + return c.updateStateAndResult(pipelineRun, state, result, metav1.Now()) +} + +func (c *Controller) changeAndCommitStateAndMeter(pipelineRun k8s.PipelineRun, state api.State, ts metav1.Time) error { + if err := c.changeState(pipelineRun, state, ts); err != nil { + return err + } + return c.commitStatusAndMeter(pipelineRun) +} + +func (c *Controller) updateStateAndResult(pipelineRun k8s.PipelineRun, state api.State, result api.Result, ts metav1.Time) error { + pipelineRun.UpdateResult(result, ts) + if err := c.changeAndCommitStateAndMeter(pipelineRun, state, ts); err != nil { + return err + } + c.metrics.CountResult(pipelineRun.GetStatus().Result) + if state == api.StateFinished { + return pipelineRun.DeleteFinalizerIfExists() + } + return nil +} + +func (c *Controller) commitStatusAndMeter(pipelineRun k8s.PipelineRun) error { + start := time.Now() + finishedStates, err := pipelineRun.CommitStatus() + if err != nil { return err } - return pipelineRun.DeleteFinalizerIfExists() + end := time.Now() + elapsed := end.Sub(start) + c.metrics.ObserveUpdateDurationByType("UpdateState", elapsed) + for _, finishedState := range finishedStates { + err := c.metrics.ObserveDurationByState(finishedState) + if err != nil { + klog.Errorf("Failed to measure state '%+v': '%s'", finishedState, err) + } + } + return nil } // handleAborted checks if pipeline run should be aborted. @@ -457,8 +455,7 @@ func (c *Controller) handleAborted(pipelineRun k8s.PipelineRun) error { intent := pipelineRun.GetSpec().Intent if intent == api.IntentAbort && pipelineRun.GetStatus().Result == api.ResultUndefined { pipelineRun.UpdateMessage("Aborted") - pipelineRun.UpdateResult(api.ResultAborted) - return c.changeState(pipelineRun, api.StateCleaning) + return c.updateStateAndResult(pipelineRun, api.StateCleaning, api.ResultAborted, metav1.Now()) } return nil } diff --git a/pkg/runctl/controller_test.go b/pkg/runctl/controller_test.go index ba1b2319..761812da 100644 --- a/pkg/runctl/controller_test.go +++ b/pkg/runctl/controller_test.go @@ -197,9 +197,6 @@ func Test_Controller_syncHandler_delete(t *testing.T) { } { expectedStateOnError := currentState - if currentState == api.StateUndefined { - expectedStateOnError = api.StateNew - } for _, test := range []struct { name string @@ -361,7 +358,7 @@ func Test_Controller_syncHandler_mock_start(t *testing.T) { name: "new_ok", pipelineSpec: api.PipelineSpec{}, runManagerExpectation: func(rm *runmocks.MockManager, run *runmocks.MockRun) { - rm.EXPECT().Start(gomock.Any(), gomock.Any()).Return(nil) + rm.EXPECT().Start(gomock.Any(), gomock.Any()).Return("", "", nil) }, pipelineRunsConfigStub: newEmptyRunsConfig, isMaintenanceModeStub: newIsMaintenanceModeStub(false, nil), @@ -400,11 +397,9 @@ func Test_Controller_syncHandler_mock_start(t *testing.T) { expectedError: fmt.Errorf("pipeline execution is paused while the system is in maintenance mode"), }, { - name: "new_get_cofig_fail_not_recoverable", - pipelineSpec: api.PipelineSpec{}, - runManagerExpectation: func(rm *runmocks.MockManager, run *runmocks.MockRun) { - rm.EXPECT().Cleanup(gomock.Any()).Return(nil) - }, + name: "new_get_cofig_fail_not_recoverable", + pipelineSpec: api.PipelineSpec{}, + runManagerExpectation: func(rm *runmocks.MockManager, run *runmocks.MockRun) {}, pipelineRunsConfigStub: func() (*cfg.PipelineRunsConfigStruct, error) { return nil, error1 }, @@ -499,7 +494,7 @@ func Test_Controller_syncHandler_mock(t *testing.T) { State: api.StatePreparing, }, runManagerExpectation: func(rm *runmocks.MockManager, run *runmocks.MockRun) { - rm.EXPECT().Start(gomock.Any(), gomock.Any()).Return(nil) + rm.EXPECT().Start(gomock.Any(), gomock.Any()).Return("", "", nil) }, pipelineRunsConfigStub: newEmptyRunsConfig, expectedResult: api.ResultUndefined, @@ -512,7 +507,7 @@ func Test_Controller_syncHandler_mock(t *testing.T) { State: api.StatePreparing, }, runManagerExpectation: func(rm *runmocks.MockManager, run *runmocks.MockRun) { - rm.EXPECT().Start(gomock.Any(), gomock.Any()).Return(error1) + rm.EXPECT().Start(gomock.Any(), gomock.Any()).Return("", "", error1) }, pipelineRunsConfigStub: newEmptyRunsConfig, expectedResult: api.ResultUndefined, @@ -530,7 +525,7 @@ func Test_Controller_syncHandler_mock(t *testing.T) { }, runManagerExpectation: func(rm *runmocks.MockManager, run *runmocks.MockRun) { - rm.EXPECT().Start(gomock.Any(), gomock.Any()).Return(serrors.Classify(error1, api.ResultErrorContent)) + rm.EXPECT().Start(gomock.Any(), gomock.Any()).Return("", "", serrors.Classify(error1, api.ResultErrorContent)) }, pipelineRunsConfigStub: newEmptyRunsConfig, expectedResult: api.ResultErrorContent, @@ -632,7 +627,7 @@ func Test_Controller_syncHandler_mock(t *testing.T) { rm.EXPECT().GetRun(gomock.Any()).Return(nil, error1) }, pipelineRunsConfigStub: newEmptyRunsConfig, - expectedResult: "", + expectedResult: "error_infra", expectedState: api.StateCleaning, expectedMessage: "running failed .*error1", }, @@ -647,6 +642,8 @@ func Test_Controller_syncHandler_mock(t *testing.T) { &corev1.ContainerState{ Running: &corev1.ContainerStateRunning{}, }) + now := metav1.Now() + run.EXPECT().GetCompletionTime().Return(&now) run.EXPECT().IsFinished().Return(true, api.ResultTimeout) run.EXPECT().GetMessage() rm.EXPECT().GetRun(gomock.Any()).Return(run, nil) @@ -668,7 +665,9 @@ func Test_Controller_syncHandler_mock(t *testing.T) { Message: "message", }, }) + now := metav1.Now() run.EXPECT().IsFinished().Return(true, api.ResultSuccess) + run.EXPECT().GetCompletionTime().Return(&now) run.EXPECT().GetMessage() rm.EXPECT().GetRun(gomock.Any()).Return(run, nil) }, diff --git a/pkg/runctl/run.go b/pkg/runctl/run.go index 4ff74ff1..cff5f9a1 100644 --- a/pkg/runctl/run.go +++ b/pkg/runctl/run.go @@ -24,6 +24,11 @@ func (r *tektonRun) GetStartTime() *metav1.Time { return r.tektonTaskRun.Status.StartTime } +// GetCompletionTime returns completion time of run if already completed +func (r *tektonRun) GetCompletionTime() *metav1.Time { + return r.tektonTaskRun.Status.CompletionTime +} + // GetContainerInfo returns the state of the Jenkinsfile Runner container // as reported in the Tekton TaskRun status. func (r *tektonRun) GetContainerInfo() *corev1.ContainerState { diff --git a/pkg/runctl/run/interfaces.go b/pkg/runctl/run/interfaces.go index de8d2a38..f74010e4 100644 --- a/pkg/runctl/run/interfaces.go +++ b/pkg/runctl/run/interfaces.go @@ -10,7 +10,7 @@ import ( // Manager manages runs type Manager interface { - Start(pipelineRun k8s.PipelineRun, pipelineRunsConfig *cfg.PipelineRunsConfigStruct) error + Start(pipelineRun k8s.PipelineRun, pipelineRunsConfig *cfg.PipelineRunsConfigStruct) (string, string, error) GetRun(pipelineRun k8s.PipelineRun) (Run, error) Cleanup(pipelineRun k8s.PipelineRun) error } @@ -19,6 +19,7 @@ type Manager interface { type Run interface { GetStartTime() *metav1.Time IsFinished() (bool, steward.Result) + GetCompletionTime() *metav1.Time GetContainerInfo() *corev1.ContainerState GetMessage() string } diff --git a/pkg/runctl/run/mocks/mocks.go b/pkg/runctl/run/mocks/mocks.go index c7f2560f..3a5c1c64 100644 --- a/pkg/runctl/run/mocks/mocks.go +++ b/pkg/runctl/run/mocks/mocks.go @@ -61,6 +61,20 @@ func (m *MockRun) EXPECT() *MockRunMockRecorder { return m.recorder } +// GetCompletionTime mocks base method +func (m *MockRun) GetCompletionTime() *v10.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetCompletionTime") + ret0, _ := ret[0].(*v10.Time) + return ret0 +} + +// GetCompletionTime indicates an expected call of GetCompletionTime +func (mr *MockRunMockRecorder) GetCompletionTime() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCompletionTime", reflect.TypeOf((*MockRun)(nil).GetCompletionTime)) +} + // GetContainerInfo mocks base method func (m *MockRun) GetContainerInfo() *v1.ContainerState { m.ctrl.T.Helper() @@ -171,11 +185,13 @@ func (mr *MockManagerMockRecorder) GetRun(arg0 interface{}) *gomock.Call { } // Start mocks base method -func (m *MockManager) Start(arg0 k8s.PipelineRun, arg1 *cfg.PipelineRunsConfigStruct) error { +func (m *MockManager) Start(arg0 k8s.PipelineRun, arg1 *cfg.PipelineRunsConfigStruct) (string, string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Start", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(string) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // Start indicates an expected call of Start diff --git a/pkg/runctl/run_manager.go b/pkg/runctl/run_manager.go index 57feaa98..1b075e8f 100644 --- a/pkg/runctl/run_manager.go +++ b/pkg/runctl/run_manager.go @@ -64,8 +64,10 @@ type runManager struct { type runManagerTesting struct { cleanupStub func(*runContext) error copySecretsToRunNamespaceStub func(*runContext) (string, []string, error) + createTektonTaskRunStub func(*runContext) error getSecretManagerStub func(*runContext) runifc.SecretManager getServiceAccountSecretNameStub func(*runContext) string + prepareRunNamespaceStub func(*runContext) error setupLimitRangeFromConfigStub func(*runContext) error setupNetworkPolicyFromConfigStub func(*runContext) error setupNetworkPolicyThatIsolatesAllPodsStub func(*runContext) error @@ -94,33 +96,42 @@ func newRunManager(factory k8s.ClientFactory, secretProvider secrets.SecretProvi // Start prepares the isolated environment for a new run and starts // the run in this environment. -func (c *runManager) Start(pipelineRun k8s.PipelineRun, pipelineRunsConfig *cfg.PipelineRunsConfigStruct) error { - var err error +func (c *runManager) Start(pipelineRun k8s.PipelineRun, pipelineRunsConfig *cfg.PipelineRunsConfigStruct) (namespace string, auxNamespace string, err error) { + ctx := &runContext{ pipelineRun: pipelineRun, pipelineRunsConfig: pipelineRunsConfig, runNamespace: pipelineRun.GetRunNamespace(), auxNamespace: pipelineRun.GetAuxNamespace(), } - err = c.cleanup(ctx) + err = c.cleanupNamespaces(ctx) if err != nil { - return err + return "", "", err } + + // If something goes wrong while creating objects inside the namespaces, we delete everything. + defer func() { + if err != nil { + c.cleanupNamespaces(ctx) // clean-up ignoring error + } + }() + err = c.prepareRunNamespace(ctx) if err != nil { - return err - } - err = c.createTektonTaskRun(ctx) - if err != nil { - return err + return "", "", err } - return nil + return ctx.runNamespace, ctx.auxNamespace, c.createTektonTaskRun(ctx) } // prepareRunNamespace creates a new namespace for the pipeline run // and populates it with needed resources. func (c *runManager) prepareRunNamespace(ctx *runContext) error { + + if c.testing != nil && c.testing.prepareRunNamespaceStub != nil { + return c.testing.prepareRunNamespaceStub(ctx) + } + var err error randName, err := utils.RandomAlphaNumString(runNamespaceRandomLength) @@ -130,25 +141,15 @@ func (c *runManager) prepareRunNamespace(ctx *runContext) error { ctx.runNamespace, err = c.createNamespace(ctx, "main", randName) if err != nil { - return errors.Wrap(err, "failed to create main run namespace") + return err } - ctx.pipelineRun.UpdateRunNamespace(ctx.runNamespace) if featureflag.CreateAuxNamespaceIfUnused.Enabled() { ctx.auxNamespace, err = c.createNamespace(ctx, "aux", randName) if err != nil { - return errors.Wrap(err, "failed to create auxiliary run namespace") - } - ctx.pipelineRun.UpdateAuxNamespace(ctx.auxNamespace) - } - - // If something goes wrong while creating objects inside the namespaces, we delete everything. - cleanupOnError := func() { - if err != nil { - c.cleanup(ctx) // clean-up ignoring error + return err } } - defer cleanupOnError() pipelineCloneSecretName, imagePullSecretNames, err := c.copySecretsToRunNamespace(ctx) if err != nil { @@ -460,6 +461,11 @@ func (c *runManager) getServiceAccountSecretName(ctx *runContext) string { } func (c *runManager) createTektonTaskRun(ctx *runContext) error { + + if c.testing != nil && c.testing.createTektonTaskRunStub != nil { + return c.testing.createTektonTaskRunStub(ctx) + } + var err error copyInt64Ptr := func(ptr *int64) *int64 { @@ -660,10 +666,10 @@ func (c *runManager) Cleanup(pipelineRun k8s.PipelineRun) error { runNamespace: pipelineRun.GetRunNamespace(), auxNamespace: pipelineRun.GetAuxNamespace(), } - return c.cleanup(ctx) + return c.cleanupNamespaces(ctx) } -func (c *runManager) cleanup(ctx *runContext) error { +func (c *runManager) cleanupNamespaces(ctx *runContext) error { if c.testing != nil && c.testing.cleanupStub != nil { return c.testing.cleanupStub(ctx) } @@ -675,9 +681,7 @@ func (c *runManager) cleanup(ctx *runContext) error { PropagationPolicy: &deletePropagation, } } - - var firstErr error - + errors := []error{} namespacesToDelete := []string{ ctx.runNamespace, ctx.auxNamespace, @@ -687,17 +691,21 @@ func (c *runManager) cleanup(ctx *runContext) error { continue } err := c.deleteNamespace(name, deleteOptions) - if err != nil && firstErr == nil { - firstErr = err + if err != nil { + errors = append(errors, err) } } - - if firstErr != nil { - // TODO Don't store on resource as message. Add it as event. - ctx.pipelineRun.StoreErrorAsMessage(firstErr, "cleanup failed") + if len(errors) == 0 { + return nil } - - return firstErr + if len(errors) == 1 { + return errors[0] + } + msg := []string{} + for _, e := range errors { + msg = append(msg, e.Error()) + } + return fmt.Errorf("cannot delete all namespaces: %s", strings.Join(msg, ", ")) } func (c *runManager) createNamespace(ctx *runContext, purpose, randName string) (string, error) { diff --git a/pkg/runctl/run_manager_test.go b/pkg/runctl/run_manager_test.go index 670e9e3b..6669b9b4 100644 --- a/pkg/runctl/run_manager_test.go +++ b/pkg/runctl/run_manager_test.go @@ -109,12 +109,12 @@ func Test__runManager_prepareRunNamespace__CreatesNamespaces(t *testing.T) { pipelineRun1 := h.getPipelineRunFromStorage(cf, h.namespace1, h.pipelineRun1) expectedNamespaces := []string{h.namespace1} - h.verifyNamespace(cf, pipelineRun1.Status.Namespace, "main") - expectedNamespaces = append(expectedNamespaces, pipelineRun1.Status.Namespace) + h.verifyNamespace(cf, runCtx.runNamespace, "main") + expectedNamespaces = append(expectedNamespaces, runCtx.runNamespace) if ffEnabled { - h.verifyNamespace(cf, pipelineRun1.Status.AuxiliaryNamespace, "aux") - expectedNamespaces = append(expectedNamespaces, pipelineRun1.Status.AuxiliaryNamespace) + h.verifyNamespace(cf, runCtx.auxNamespace, "aux") + expectedNamespaces = append(expectedNamespaces, runCtx.auxNamespace) } else { assert.Equal(t, pipelineRun1.Status.AuxiliaryNamespace, "") } @@ -150,17 +150,9 @@ func Test__runManager_prepareRunNamespace__Calls__copySecretsToRunNamespace__And methodCalled = true assert.Assert(t, ctx.pipelineRun == pipelineRunHelper) assert.Assert(t, ctx.runNamespace != "") - assert.Equal(t, pipelineRunHelper.GetRunNamespace(), ctx.runNamespace) return "", nil, expectedError } - var cleanupCalled bool - examinee.testing.cleanupStub = func(ctx *runContext) error { - assert.Assert(t, ctx.pipelineRun == pipelineRunHelper) - cleanupCalled = true - return nil - } - runCtx := &runContext{ pipelineRun: pipelineRunHelper, pipelineRunsConfig: config, @@ -172,7 +164,6 @@ func Test__runManager_prepareRunNamespace__Calls__copySecretsToRunNamespace__And // VERIFY assert.Equal(t, expectedError, resultErr) assert.Assert(t, methodCalled == true) - assert.Assert(t, cleanupCalled == true) } func Test__runManager_prepareRunNamespace__Calls_setupServiceAccount_AndPropagatesError(t *testing.T) { @@ -201,7 +192,6 @@ func Test__runManager_prepareRunNamespace__Calls_setupServiceAccount_AndPropagat examinee.testing.setupServiceAccountStub = func(ctx *runContext, pipelineCloneSecretName string, imagePullSecretNames []string) error { methodCalled = true assert.Assert(t, ctx.runNamespace != "") - assert.Equal(t, pipelineRunHelper.GetRunNamespace(), ctx.runNamespace) assert.Equal(t, expectedPipelineCloneSecretName, pipelineCloneSecretName) assert.DeepEqual(t, expectedImagePullSecretNames, imagePullSecretNames) return expectedError @@ -210,13 +200,6 @@ func Test__runManager_prepareRunNamespace__Calls_setupServiceAccount_AndPropagat return expectedPipelineCloneSecretName, expectedImagePullSecretNames, nil } - var cleanupCalled bool - examinee.testing.cleanupStub = func(ctx *runContext) error { - assert.Assert(t, ctx.pipelineRun == pipelineRunHelper) - cleanupCalled = true - return nil - } - runCtx := &runContext{ pipelineRun: pipelineRunHelper, pipelineRunsConfig: config, @@ -228,7 +211,6 @@ func Test__runManager_prepareRunNamespace__Calls_setupServiceAccount_AndPropagat // VERIFY assert.Equal(t, expectedError, resultErr) assert.Assert(t, methodCalled == true) - assert.Assert(t, cleanupCalled == true) } func Test__runManager_prepareRunNamespace__Calls_setupStaticNetworkPolicies_AndPropagatesError(t *testing.T) { @@ -255,17 +237,9 @@ func Test__runManager_prepareRunNamespace__Calls_setupStaticNetworkPolicies_AndP examinee.testing.setupStaticNetworkPoliciesStub = func(ctx *runContext) error { methodCalled = true assert.Assert(t, ctx.runNamespace != "") - assert.Equal(t, pipelineRunHelper.GetRunNamespace(), ctx.runNamespace) return expectedError } - var cleanupCalled bool - examinee.testing.cleanupStub = func(ctx *runContext) error { - assert.Assert(t, ctx.pipelineRun == pipelineRunHelper) - cleanupCalled = true - return nil - } - runCtx := &runContext{ pipelineRun: pipelineRunHelper, pipelineRunsConfig: config, @@ -277,7 +251,6 @@ func Test__runManager_prepareRunNamespace__Calls_setupStaticNetworkPolicies_AndP // VERIFY assert.Equal(t, expectedError, resultErr) assert.Assert(t, methodCalled == true) - assert.Assert(t, cleanupCalled == true) } func Test__runManager_setupStaticNetworkPolicies__Succeeds(t *testing.T) { @@ -1291,16 +1264,103 @@ func Test__runManager_Start__CreatesTektonTaskRun(t *testing.T) { examinee.testing = newRunManagerTestingWithRequiredStubs() // EXERCISE - resultError := examinee.Start(mockPipelineRun, config) + runNamespace, _, resultError := examinee.Start(mockPipelineRun, config) assert.NilError(t, resultError) // VERIFY - result, err := mockFactory.TektonV1beta1().TaskRuns(mockPipelineRun.GetRunNamespace()).Get( + result, err := mockFactory.TektonV1beta1().TaskRuns(runNamespace).Get( tektonTaskRunName, metav1.GetOptions{}) assert.NilError(t, err) assert.Assert(t, result != nil) } +func Test__runManager_Start__Perform_cleanup_on_error(t *testing.T) { + t.Parallel() + + prepareRunnamespaceErr := fmt.Errorf("cannot prepare run namespace: foo") + createTektonTaskRunError := fmt.Errorf("cannot create tekton taks run: foo") + cleanupError := fmt.Errorf("cannot cleanup: foo") + + for _, test := range []struct { + name string + prepareRunNamespaceError error + createTektonTaskRunError error + cleanupError error + failOnCleanupCount int + expectedError error + expectedCleanupCount int + }{ + { + name: "no failure", + expectedCleanupCount: 1, // before, no cleanup afterwards since no error occured + }, + { + name: "failing inside prepareRunNamespace", + prepareRunNamespaceError: prepareRunnamespaceErr, + expectedError: prepareRunnamespaceErr, + expectedCleanupCount: 2, // before and after (since error occured) + }, + { + name: "failing inside creating tekton task run", + createTektonTaskRunError: createTektonTaskRunError, + expectedError: createTektonTaskRunError, + expectedCleanupCount: 2, // before and after (since error occured) + }, + { + name: "failing inside initial cleanup", + failOnCleanupCount: 1, + cleanupError: cleanupError, + expectedError: cleanupError, + expectedCleanupCount: 1, // we are failing inside the initial cleanup, but this gets called. + }, + { + name: "failing inside defered cleanup", + prepareRunNamespaceError: prepareRunnamespaceErr, + failOnCleanupCount: 2, + cleanupError: cleanupError, + expectedError: prepareRunnamespaceErr, // we still expect "content" error + expectedCleanupCount: 2, // we are failing inside the second (defered) cleanup + }, + } { + t.Run(test.name, func(t *testing.T) { + // SETUP + h := newTestHelper1(t) + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + mockFactory, mockPipelineRun, mockSecretProvider := h.prepareMocks(mockCtrl) + config := &cfg.PipelineRunsConfigStruct{} + + examinee := newRunManager(mockFactory, mockSecretProvider) + examinee.testing = newRunManagerTestingWithRequiredStubs() + + var cleanupCalled int + examinee.testing.cleanupStub = func(ctx *runContext) error { + assert.Assert(t, ctx.pipelineRun == mockPipelineRun) + cleanupCalled++ + if test.cleanupError != nil && cleanupCalled == test.failOnCleanupCount { + return test.cleanupError + } + return nil + } + examinee.testing.createTektonTaskRunStub = func(ctx *runContext) error { + return test.createTektonTaskRunError + } + examinee.testing.prepareRunNamespaceStub = func(ctx *runContext) error { + return test.prepareRunNamespaceError + } + + // EXERCISE + _, _, resultError := examinee.Start(mockPipelineRun, config) + + // VERIFY + if test.expectedError != nil { + assert.Error(t, resultError, test.expectedError.Error()) + } + assert.Assert(t, cleanupCalled == test.expectedCleanupCount) + }) + } +} + func Test__runManager_addTektonTaskRunParamsForJenkinsfileRunnerImage(t *testing.T) { t.Parallel() @@ -1419,12 +1479,12 @@ func Test__runManager_Start__DoesNotSetPipelineRunStatus(t *testing.T) { examinee.testing = newRunManagerTestingWithRequiredStubs() // EXERCISE - resultError := examinee.Start(mockPipelineRun, config) + _, _, resultError := examinee.Start(mockPipelineRun, config) assert.NilError(t, resultError) // VERIFY // UpdateState should never be called - mockPipelineRun.EXPECT().UpdateState(gomock.Any()).Times(0) + mockPipelineRun.EXPECT().UpdateState(gomock.Any(), gomock.Any()).Times(0) } func Test__runManager_copySecretsToRunNamespace__DoesCopySecret(t *testing.T) { @@ -1492,6 +1552,10 @@ func Test__runManager_Cleanup__RemovesNamespaces(t *testing.T) { } err = examinee.prepareRunNamespace(runCtx) assert.NilError(t, err) + runCtx.pipelineRun.UpdateRunNamespace(runCtx.runNamespace) + runCtx.pipelineRun.UpdateAuxNamespace(runCtx.auxNamespace) + _, err = runCtx.pipelineRun.CommitStatus() + assert.NilError(t, err) { pipelineRun1 := h.getPipelineRunFromStorage(cf, h.namespace1, h.pipelineRun1) expectedNamespaces := []string{h.namespace1} @@ -1946,6 +2010,7 @@ func (*testHelper1) prepareMocksWithSpec(ctrl *gomock.Controller, spec *stewardv mockPipelineRun.EXPECT().UpdateAuxNamespace(gomock.Any()).Do(func(arg string) { auxNamespace = arg }).MaxTimes(1) + mockPipelineRun.EXPECT().CommitStatus().MaxTimes(1) mockSecretProvider := secretmocks.NewMockSecretProvider(ctrl)