Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 26 additions & 40 deletions tests/workflow_memo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,34 @@ import (
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/suite"
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
filterpb "go.temporal.io/api/filter/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/tests/testcore"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type WorkflowMemoTestSuite struct {
testcore.FunctionalTestBase
type RunIDGetter interface {
GetRunId() string
}
type startFunc func() (RunIDGetter, error)

func TestWorkflowMemoTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(WorkflowMemoTestSuite))
t.Run("TestStartWithMemo", testStartWithMemo)
t.Run("TestSignalWithStartWithMemo", testSignalWithStartWithMemo)
}

type RunIdGetter interface {
GetRunId() string
}
type startFunc func() (RunIdGetter, error)
func testStartWithMemo(t *testing.T) {
s := testcore.NewEnv(t)

func (s *WorkflowMemoTestSuite) TestStartWithMemo() {
id := "functional-start-with-memo-test"
wt := "functional-start-with-memo-test-type"
tl := "functional-start-with-memo-test-taskqueue"
Expand All @@ -60,18 +57,20 @@ func (s *WorkflowMemoTestSuite) TestStartWithMemo() {
Memo: memo,
}

fn := func() (RunIdGetter, error) {
fn := func() (RunIDGetter, error) {
return s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request)
}
s.startWithMemoHelper(fn, id, &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, memo, `
startWithMemoHelper(s, fn, id, &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, memo, `
1 WorkflowExecutionStarted {"Memo":{"Fields":{"Info":{"Data":"\"memo-value\""}}}}
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 WorkflowExecutionCompleted`)
}

func (s *WorkflowMemoTestSuite) TestSignalWithStartWithMemo() {
func testSignalWithStartWithMemo(t *testing.T) {
s := testcore.NewEnv(t)

id := "functional-signal-with-start-with-memo-test"
wt := "functional-signal-with-start-with-memo-test-type"
tl := "functional-signal-with-start-with-memo-test-taskqueue"
Expand Down Expand Up @@ -100,10 +99,10 @@ func (s *WorkflowMemoTestSuite) TestSignalWithStartWithMemo() {
Memo: memo,
}

fn := func() (RunIdGetter, error) {
fn := func() (RunIDGetter, error) {
return s.FrontendClient().SignalWithStartWorkflowExecution(testcore.NewContext(), request)
}
s.startWithMemoHelper(fn, id, &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, memo, `
startWithMemoHelper(s, fn, id, &taskqueuepb.TaskQueue{Name: tl, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, memo, `
1 WorkflowExecutionStarted {"Memo":{"Fields":{"Info":{"Data":"\"memo-value\""}}}}
2 WorkflowExecutionSignaled
3 WorkflowTaskScheduled
Expand All @@ -112,33 +111,23 @@ func (s *WorkflowMemoTestSuite) TestSignalWithStartWithMemo() {
6 WorkflowExecutionCompleted`)
}

// helper function for TestStartWithMemo and TestSignalWithStartWithMemo to reduce duplicate code
func (s *WorkflowMemoTestSuite) startWithMemoHelper(startFn startFunc, id string, taskQueue *taskqueuepb.TaskQueue, memo *commonpb.Memo, expectedHistory string) {
identity := "worker1"

// helper function for testStartWithMemo and testSignalWithStartWithMemo to reduce duplicate code
func startWithMemoHelper(s *testcore.TestEnv, startFn startFunc, id string, taskQueue *taskqueuepb.TaskQueue, memo *commonpb.Memo, expectedHistory string) {
we, err0 := startFn()
s.NoError(err0)

s.Logger.Info("StartWorkflowExecution: response", tag.WorkflowRunID(we.GetRunId()))

wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) {
return &workflowservice.RespondWorkflowTaskCompletedRequest{
Commands: []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
}},
}},
}}, nil
}, nil
}

poller := &testcore.TaskPoller{
Client: s.FrontendClient(),
Namespace: s.Namespace().String(),
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.Logger,
T: s.T(),
}
tv := s.Tv().WithTaskQueue(taskQueue.Name)

// verify open visibility
var openExecutionInfo *workflowpb.WorkflowExecutionInfo
Expand All @@ -160,7 +149,6 @@ func (s *WorkflowMemoTestSuite) startWithMemoHelper(startFn startFunc, id string
openExecutionInfo = resp.Executions[0]
return true
}
s.Logger.Info("Open WorkflowExecution is not yet visible")
return false
},
testcore.WaitForESToSettle,
Expand All @@ -184,8 +172,7 @@ func (s *WorkflowMemoTestSuite) startWithMemoHelper(startFn startFunc, id string
s.ProtoEqual(memo, descResp.WorkflowExecutionInfo.Memo)

// make progress of workflow
_, err = poller.PollAndProcessWorkflowTask()
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
_, err = s.TaskPoller().PollAndHandleWorkflowTask(tv, wtHandler)
s.NoError(err)

// verify history
Expand Down Expand Up @@ -217,7 +204,6 @@ func (s *WorkflowMemoTestSuite) startWithMemoHelper(startFn startFunc, id string
closedExecutionInfo = resp.Executions[0]
return true
}
s.Logger.Info("Closed WorkflowExecution is not yet visible")
return false
},
testcore.WaitForESToSettle,
Expand Down
Loading