From e88ffaaa7adb9eebd2e615e68e0b8bc13258db78 Mon Sep 17 00:00:00 2001 From: Tianxin Dong Date: Sat, 17 Aug 2024 00:14:12 +0800 Subject: [PATCH] Fix: fix context in provider (#194) fix: fix context in provider Signed-off-by: FogDong --- pkg/executor/workflow.go | 17 +++++++------- pkg/executor/workflow_test.go | 36 +++++++++++++++++------------- pkg/providers/http/http.cue | 2 +- pkg/providers/http/http.go | 4 ++++ pkg/providers/legacy/http/http.cue | 2 +- pkg/providers/legacy/http/http.go | 6 ++++- pkg/tasks/custom/action.go | 2 +- 7 files changed, 41 insertions(+), 28 deletions(-) diff --git a/pkg/executor/workflow.go b/pkg/executor/workflow.go index 7d4b775..84156e8 100644 --- a/pkg/executor/workflow.go +++ b/pkg/executor/workflow.go @@ -113,12 +113,6 @@ func (w *workflowExecutor) ExecuteRunners(ctx monitorContext.Context, taskRunner } return v1alpha1.WorkflowStateFailed, nil } - if checkWorkflowSuspended(status) { - return v1alpha1.WorkflowStateSuspending, nil - } - if allRunnersSucceeded { - return v1alpha1.WorkflowStateSucceeded, nil - } wfCtx, err := w.makeContext(ctx, w.instance.Name) if err != nil { @@ -127,6 +121,13 @@ func (w *workflowExecutor) ExecuteRunners(ctx monitorContext.Context, taskRunner } w.wfCtx = wfCtx + if checkWorkflowSuspended(status) { + return v1alpha1.WorkflowStateSuspending, nil + } + if allRunnersSucceeded { + return v1alpha1.WorkflowStateSucceeded, nil + } + if cacheValue, ok := StepStatusCache.Load(cacheKey); ok { // handle cache resource if len(status.Steps) < cacheValue.(int) { @@ -173,11 +174,11 @@ func checkWorkflowSuspended(status *v1alpha1.WorkflowRunStatus) bool { // if workflow is suspended and the suspended step is still running, return false to run the suspended step if status.Suspend { for _, step := range status.Steps { - if step.Phase == v1alpha1.WorkflowStepPhaseSuspending { + if step.Reason == types.StatusReasonSuspend && step.Phase == v1alpha1.WorkflowStepPhaseSuspending { return false } for _, sub := range step.SubStepsStatus { - if sub.Phase == v1alpha1.WorkflowStepPhaseSuspending { + if sub.Reason == types.StatusReasonSuspend && sub.Phase == v1alpha1.WorkflowStepPhaseSuspending { return false } } diff --git a/pkg/executor/workflow_test.go b/pkg/executor/workflow_test.go index b2acba9..5c02ae7 100644 --- a/pkg/executor/workflow_test.go +++ b/pkg/executor/workflow_test.go @@ -1772,10 +1772,11 @@ var _ = Describe("Test Workflow", func() { }, }, { StepStatus: v1alpha1.StepStatus{ - Name: "s2", - ID: "s2", - Type: "suspend", - Phase: v1alpha1.WorkflowStepPhaseSuspending, + Name: "s2", + ID: "s2", + Type: "suspend", + Reason: types.StatusReasonSuspend, + Phase: v1alpha1.WorkflowStepPhaseSuspending, }, }}, })).Should(BeEquivalentTo("")) @@ -1805,10 +1806,11 @@ var _ = Describe("Test Workflow", func() { }, }, { StepStatus: v1alpha1.StepStatus{ - Name: "s2", - ID: "s2", - Type: "suspend", - Phase: v1alpha1.WorkflowStepPhaseSucceeded, + Name: "s2", + ID: "s2", + Type: "suspend", + Reason: types.StatusReasonSuspend, + Phase: v1alpha1.WorkflowStepPhaseSucceeded, }, }, { StepStatus: v1alpha1.StepStatus{ @@ -1884,10 +1886,11 @@ var _ = Describe("Test Workflow", func() { Type: "success", Phase: v1alpha1.WorkflowStepPhaseSucceeded, }, { - Name: "s2-sub2", - ID: "s2-sub2", - Type: "suspend", - Phase: v1alpha1.WorkflowStepPhaseSuspending, + Name: "s2-sub2", + ID: "s2-sub2", + Type: "suspend", + Reason: types.StatusReasonSuspend, + Phase: v1alpha1.WorkflowStepPhaseSuspending, }, }, }}, @@ -2234,10 +2237,11 @@ func makeRunner(step v1alpha1.WorkflowStep, subTaskRunners []types.TaskRunner) t } } return v1alpha1.StepStatus{ - Name: step.Name, - Type: "suspend", - ID: step.Name, - Phase: v1alpha1.WorkflowStepPhaseSuspending, + Name: step.Name, + Type: "suspend", + ID: step.Name, + Phase: v1alpha1.WorkflowStepPhaseSuspending, + Reason: types.StatusReasonSuspend, }, &types.Operation{ Suspend: true, }, nil diff --git a/pkg/providers/http/http.cue b/pkg/providers/http/http.cue index c570ac8..403dc44 100644 --- a/pkg/providers/http/http.cue +++ b/pkg/providers/http/http.cue @@ -29,7 +29,7 @@ // +usgae=The tls config of the request tls_config?: { secret: string - namespace: context.namespace + namespace?: string } } diff --git a/pkg/providers/http/http.go b/pkg/providers/http/http.go index f6069f0..d0a3f77 100644 --- a/pkg/providers/http/http.go +++ b/pkg/providers/http/http.go @@ -34,6 +34,7 @@ import ( cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime" + "github.com/kubevela/workflow/pkg/cue/model" "github.com/kubevela/workflow/pkg/providers/legacy/http/ratelimiter" providertypes "github.com/kubevela/workflow/pkg/providers/types" ) @@ -145,6 +146,9 @@ func runHTTP(ctx context.Context, params *DoParams) (*DoReturns, error) { req.Trailer = trailer if params.Params.TLSConfig != nil { + if params.Params.TLSConfig.Namespace == "" { + params.Params.TLSConfig.Namespace = fmt.Sprint(params.ProcessContext.GetData(model.ContextNamespace)) + } if tr, err := getTransport(ctx, params.KubeClient, params.Params.TLSConfig.Secret, params.Params.TLSConfig.Namespace); err == nil && tr != nil { defaultClient.Transport = tr } diff --git a/pkg/providers/legacy/http/http.cue b/pkg/providers/legacy/http/http.cue index 566674e..98ceaad 100644 --- a/pkg/providers/legacy/http/http.cue +++ b/pkg/providers/legacy/http/http.cue @@ -28,7 +28,7 @@ // +usgae=The tls config of the request tls_config?: { secret: string - namespace: context.namespace + namespace?: string } // +usage=The response of the request will be filled in this field after the action is executed response: { diff --git a/pkg/providers/legacy/http/http.go b/pkg/providers/legacy/http/http.go index db59f2b..77688f4 100644 --- a/pkg/providers/legacy/http/http.go +++ b/pkg/providers/legacy/http/http.go @@ -34,6 +34,7 @@ import ( cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime" + "github.com/kubevela/workflow/pkg/cue/model" "github.com/kubevela/workflow/pkg/providers/legacy/http/ratelimiter" providertypes "github.com/kubevela/workflow/pkg/providers/types" ) @@ -69,7 +70,7 @@ type RateLimiter struct { // TLSConfig . type TLSConfig struct { Secret string `json:"secret"` - Namespace string `json:"namespace"` + Namespace string `json:"namespace,omitempty"` } // RequestVars is the vars for http request @@ -142,6 +143,9 @@ func runHTTP(ctx context.Context, params *DoParams) (*ResponseVars, error) { req.Trailer = trailer if params.Params.TLSConfig != nil { + if params.Params.TLSConfig.Namespace == "" { + params.Params.TLSConfig.Namespace = fmt.Sprint(params.ProcessContext.GetData(model.ContextNamespace)) + } if tr, err := getTransport(ctx, params.KubeClient, params.Params.TLSConfig.Secret, params.Params.TLSConfig.Namespace); err == nil && tr != nil { defaultClient.Transport = tr } diff --git a/pkg/tasks/custom/action.go b/pkg/tasks/custom/action.go index b9020c2..607ff02 100644 --- a/pkg/tasks/custom/action.go +++ b/pkg/tasks/custom/action.go @@ -85,7 +85,7 @@ func (exec *executor) Terminate(message string) { // Wait let workflow wait. func (exec *executor) Wait(message string) { exec.wait = true - if exec.wfStatus.Phase != v1alpha1.WorkflowStepPhaseFailed { + if exec.wfStatus.Phase != v1alpha1.WorkflowStepPhaseFailed && exec.wfStatus.Phase != v1alpha1.WorkflowStepPhaseSuspending { exec.wfStatus.Phase = v1alpha1.WorkflowStepPhaseRunning exec.wfStatus.Reason = types.StatusReasonWait if message != "" {