diff --git a/tests/workflow_memo_test.go b/tests/workflow_memo_test.go index f4bcd111ea7..8ff86a120e6 100644 --- a/tests/workflow_memo_test.go +++ b/tests/workflow_memo_test.go @@ -5,7 +5,8 @@ import ( "time" "github.com/google/uuid" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -16,26 +17,28 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/payloads" + "go.temporal.io/server/common/testing/parallelsuite" "go.temporal.io/server/tests/testcore" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) type WorkflowMemoTestSuite struct { - testcore.FunctionalTestBase + parallelsuite.Suite[*WorkflowMemoTestSuite] } func TestWorkflowMemoTestSuite(t *testing.T) { - t.Parallel() - suite.Run(t, new(WorkflowMemoTestSuite)) + parallelsuite.Run(t, &WorkflowMemoTestSuite{}) } -type RunIdGetter interface { +type RunIDGetter interface { GetRunId() string } -type startFunc func() (RunIdGetter, error) +type startFunc func() (RunIDGetter, error) func (s *WorkflowMemoTestSuite) TestStartWithMemo() { + env := testcore.NewEnv(s.T()) + id := "functional-start-with-memo-test" wt := "functional-start-with-memo-test-type" tl := "functional-start-with-memo-test-taskqueue" @@ -49,7 +52,7 @@ func (s *WorkflowMemoTestSuite) TestStartWithMemo() { request := &workflowservice.StartWorkflowExecutionRequest{ RequestId: uuid.NewString(), - Namespace: s.Namespace().String(), + Namespace: env.Namespace().String(), WorkflowId: id, WorkflowType: &commonpb.WorkflowType{Name: wt}, TaskQueue: &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, @@ -60,10 +63,10 @@ func (s *WorkflowMemoTestSuite) TestStartWithMemo() { Memo: memo, } - fn := func() (RunIdGetter, error) { - return s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) + fn := func() (RunIDGetter, error) { + return env.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) } - s.startWithMemoHelper(fn, id, &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, memo, ` + s.startWithMemoHelper(env, fn, id, &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, memo, ` 1 WorkflowExecutionStarted {"Memo":{"Fields":{"Info":{"Data":"\"memo-value\""}}}} 2 WorkflowTaskScheduled 3 WorkflowTaskStarted @@ -72,6 +75,8 @@ func (s *WorkflowMemoTestSuite) TestStartWithMemo() { } func (s *WorkflowMemoTestSuite) TestSignalWithStartWithMemo() { + env := testcore.NewEnv(s.T()) + id := "functional-signal-with-start-with-memo-test" wt := "functional-signal-with-start-with-memo-test-type" tl := "functional-signal-with-start-with-memo-test-taskqueue" @@ -87,7 +92,7 @@ func (s *WorkflowMemoTestSuite) TestSignalWithStartWithMemo() { signalInput := payloads.EncodeString("my signal input") request := &workflowservice.SignalWithStartWorkflowExecutionRequest{ RequestId: uuid.NewString(), - Namespace: s.Namespace().String(), + Namespace: env.Namespace().String(), WorkflowId: id, WorkflowType: &commonpb.WorkflowType{Name: wt}, TaskQueue: &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, @@ -100,10 +105,10 @@ func (s *WorkflowMemoTestSuite) TestSignalWithStartWithMemo() { Memo: memo, } - fn := func() (RunIdGetter, error) { - return s.FrontendClient().SignalWithStartWorkflowExecution(testcore.NewContext(), request) + fn := func() (RunIDGetter, error) { + return env.FrontendClient().SignalWithStartWorkflowExecution(testcore.NewContext(), request) } - s.startWithMemoHelper(fn, id, &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, memo, ` + s.startWithMemoHelper(env, fn, id, &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, memo, ` 1 WorkflowExecutionStarted {"Memo":{"Fields":{"Info":{"Data":"\"memo-value\""}}}} 2 WorkflowExecutionSignaled 3 WorkflowTaskScheduled @@ -113,13 +118,13 @@ func (s *WorkflowMemoTestSuite) TestSignalWithStartWithMemo() { } // helper function for TestStartWithMemo and TestSignalWithStartWithMemo to reduce duplicate code -func (s *WorkflowMemoTestSuite) startWithMemoHelper(startFn startFunc, id string, taskQueue *taskqueuepb.TaskQueue, memo *commonpb.Memo, expectedHistory string) { +func (s *WorkflowMemoTestSuite) startWithMemoHelper(env *testcore.TestEnv, startFn startFunc, id string, taskQueue *taskqueuepb.TaskQueue, memo *commonpb.Memo, expectedHistory string) { identity := "worker1" we, err0 := startFn() s.NoError(err0) - s.Logger.Info("StartWorkflowExecution: response", tag.WorkflowRunID(we.GetRunId())) + env.Logger.Info("StartWorkflowExecution: response", tag.WorkflowRunID(we.GetRunId())) wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { return []*commandpb.Command{{ @@ -131,21 +136,21 @@ func (s *WorkflowMemoTestSuite) startWithMemoHelper(startFn startFunc, id string } poller := &testcore.TaskPoller{ - Client: s.FrontendClient(), - Namespace: s.Namespace().String(), + Client: env.FrontendClient(), + Namespace: env.Namespace().String(), TaskQueue: taskQueue, Identity: identity, WorkflowTaskHandler: wtHandler, - Logger: s.Logger, + Logger: env.Logger, T: s.T(), } // verify open visibility var openExecutionInfo *workflowpb.WorkflowExecutionInfo - s.Eventually( - func() bool { - resp, err1 := s.FrontendClient().ListOpenWorkflowExecutions(testcore.NewContext(), &workflowservice.ListOpenWorkflowExecutionsRequest{ - Namespace: s.Namespace().String(), + s.EventuallyWithT( + func(t *assert.CollectT) { + resp, err1 := env.FrontendClient().ListOpenWorkflowExecutions(testcore.NewContext(), &workflowservice.ListOpenWorkflowExecutionsRequest{ + Namespace: env.Namespace().String(), MaximumPageSize: 100, StartTimeFilter: &filterpb.StartTimeFilter{ EarliestTime: nil, @@ -155,18 +160,13 @@ func (s *WorkflowMemoTestSuite) startWithMemoHelper(startFn startFunc, id string WorkflowId: id, }}, }) - s.NoError(err1) - if len(resp.Executions) == 1 { - openExecutionInfo = resp.Executions[0] - return true - } - s.Logger.Info("Open WorkflowExecution is not yet visible") - return false + require.NoError(t, err1) + require.Len(t, resp.Executions, 1) + openExecutionInfo = resp.Executions[0] }, testcore.WaitForESToSettle, 100*time.Millisecond, ) - s.NotNil(openExecutionInfo) s.ProtoEqual(memo, openExecutionInfo.Memo) execution := &commonpb.WorkflowExecution{ @@ -176,33 +176,33 @@ func (s *WorkflowMemoTestSuite) startWithMemoHelper(startFn startFunc, id string // verify DescribeWorkflowExecution result: workflow running descRequest := &workflowservice.DescribeWorkflowExecutionRequest{ - Namespace: s.Namespace().String(), + Namespace: env.Namespace().String(), Execution: execution, } - descResp, err := s.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), descRequest) + descResp, err := env.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), descRequest) s.NoError(err) s.ProtoEqual(memo, descResp.WorkflowExecutionInfo.Memo) // make progress of workflow _, err = poller.PollAndProcessWorkflowTask() - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + env.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) s.NoError(err) // verify history - historyEvents := s.GetHistory(s.Namespace().String(), execution) + historyEvents := env.GetHistory(env.Namespace().String(), execution) s.EqualHistoryEvents(expectedHistory, historyEvents) // verify DescribeWorkflowExecution result: workflow closed, but close visibility task not completed - descResp, err = s.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), descRequest) + descResp, err = env.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), descRequest) s.NoError(err) s.ProtoEqual(memo, descResp.WorkflowExecutionInfo.Memo) // verify closed visibility var closedExecutionInfo *workflowpb.WorkflowExecutionInfo - s.Eventually( - func() bool { - resp, err1 := s.FrontendClient().ListClosedWorkflowExecutions(testcore.NewContext(), &workflowservice.ListClosedWorkflowExecutionsRequest{ - Namespace: s.Namespace().String(), + s.EventuallyWithT( + func(t *assert.CollectT) { + resp, err1 := env.FrontendClient().ListClosedWorkflowExecutions(testcore.NewContext(), &workflowservice.ListClosedWorkflowExecutionsRequest{ + Namespace: env.Namespace().String(), MaximumPageSize: 100, StartTimeFilter: &filterpb.StartTimeFilter{ EarliestTime: nil, @@ -212,22 +212,17 @@ func (s *WorkflowMemoTestSuite) startWithMemoHelper(startFn startFunc, id string WorkflowId: id, }}, }) - s.NoError(err1) - if len(resp.Executions) == 1 { - closedExecutionInfo = resp.Executions[0] - return true - } - s.Logger.Info("Closed WorkflowExecution is not yet visible") - return false + require.NoError(t, err1) + require.Len(t, resp.Executions, 1) + closedExecutionInfo = resp.Executions[0] }, testcore.WaitForESToSettle, 100*time.Millisecond, ) - s.NotNil(closedExecutionInfo) s.ProtoEqual(memo, closedExecutionInfo.Memo) // verify DescribeWorkflowExecution result: workflow closed and close visibility task completed - descResp, err = s.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), descRequest) + descResp, err = env.FrontendClient().DescribeWorkflowExecution(testcore.NewContext(), descRequest) s.NoError(err) s.ProtoEqual(memo, descResp.WorkflowExecutionInfo.Memo) }