From f7ad488f03da1b6bc10efbdb0d428e842ff19826 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Tue, 14 May 2019 12:19:46 -0700 Subject: [PATCH] Fix bug: signal workflow will trigger cron workflow execution (#1838) * Fix bug: signal workflow should not trigger cron workflow execution. #1595 --- common/persistence/versionHistory.go | 1 + host/signalworkflow_test.go | 79 +++++++++++++++++++ service/history/MockMutableState.go | 14 ++++ service/history/historyEngine.go | 74 +++++++++-------- service/history/mutableState.go | 1 + service/history/mutableStateBuilder.go | 4 + service/history/timerQueueActiveProcessor.go | 3 +- service/history/timerQueueStandbyProcessor.go | 3 +- 8 files changed, 143 insertions(+), 36 deletions(-) diff --git a/common/persistence/versionHistory.go b/common/persistence/versionHistory.go index 098a9c72bd7..12260e1d7ec 100644 --- a/common/persistence/versionHistory.go +++ b/common/persistence/versionHistory.go @@ -161,6 +161,7 @@ func (h *VersionHistories) AddHistory(item VersionHistoryItem, local VersionHist return nil } +// GetHistories returns the batch histories func (h *VersionHistories) GetHistories() []VersionHistory { return h.versionHistories } diff --git a/host/signalworkflow_test.go b/host/signalworkflow_test.go index f3faad0af8b..e00cfa4013b 100644 --- a/host/signalworkflow_test.go +++ b/host/signalworkflow_test.go @@ -586,6 +586,85 @@ CheckHistoryLoopForSignalSent: s.Equal("history-service", *signalEvent.WorkflowExecutionSignaledEventAttributes.Identity) } +func (s *integrationSuite) TestSignalWorkflow_Cron_NoDecisionTaskCreated() { + id := "integration-signal-workflow-test-cron" + wt := "integration-signal-workflow-test-cron-type" + tl := "integration-signal-workflow-test-cron-tasklist" + identity := "worker1" + cronSpec := "@every 2s" + + workflowType := &workflow.WorkflowType{} + workflowType.Name = common.StringPtr(wt) + + taskList := &workflow.TaskList{} + taskList.Name = common.StringPtr(tl) + + // Start workflow execution + request := &workflow.StartWorkflowExecutionRequest{ + RequestId: common.StringPtr(uuid.New()), + Domain: common.StringPtr(s.domainName), + WorkflowId: common.StringPtr(id), + WorkflowType: workflowType, + TaskList: taskList, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + Identity: common.StringPtr(identity), + CronSchedule: &cronSpec, + } + now := time.Now() + + we, err0 := s.engine.StartWorkflowExecution(createContext(), request) + s.Nil(err0) + + s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(*we.RunId)) + + // Send first signal using RunID + signalName := "my signal" + signalInput := []byte("my signal input.") + err := s.engine.SignalWorkflowExecution(createContext(), &workflow.SignalWorkflowExecutionRequest{ + Domain: common.StringPtr(s.domainName), + WorkflowExecution: &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(id), + RunId: common.StringPtr(*we.RunId), + }, + SignalName: common.StringPtr(signalName), + Input: signalInput, + Identity: common.StringPtr(identity), + }) + s.Nil(err) + + // decider logic + var decisionTaskDelay time.Duration + dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) { + decisionTaskDelay = time.Now().Sub(now) + + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), + CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ + Result: []byte("Done."), + }, + }}, nil + } + + poller := &TaskPoller{ + Engine: s.engine, + Domain: s.domainName, + TaskList: taskList, + Identity: identity, + DecisionHandler: dtHandler, + Logger: s.Logger, + T: s.T(), + } + + // Make first decision to schedule activity + _, err = poller.PollAndProcessDecisionTask(false, false) + s.Logger.Info("PollAndProcessDecisionTask", tag.Error(err)) + s.Nil(err) + s.True(decisionTaskDelay > time.Second*2) +} + func (s *integrationSuite) TestSignalExternalWorkflowDecision_WithoutRunID() { id := "integration-signal-external-workflow-test-without-run-id" wt := "integration-signal-external-workflow-test-without-run-id-type" diff --git a/service/history/MockMutableState.go b/service/history/MockMutableState.go index 909d797bc37..461afd33ce8 100644 --- a/service/history/MockMutableState.go +++ b/service/history/MockMutableState.go @@ -1712,6 +1712,20 @@ func (_m *mockMutableState) HasPendingDecisionTask() bool { return r0 } +// HasProcessedOrPendingDecisionTask provides a mock function with given fields: +func (_m *mockMutableState) HasProcessedOrPendingDecisionTask() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + // IncrementHistorySize provides a mock function with given fields: appendSize func (_m *mockMutableState) IncrementHistorySize(appendSize int) { _m.Called(appendSize) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 57a85798332..14a710a43f6 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -2102,45 +2102,55 @@ func (e *historyEngineImpl) SignalWorkflowExecution(ctx ctx.Context, signalReque RunId: request.WorkflowExecution.RunId, } - return e.updateWorkflowExecution(ctx, domainID, execution, false, true, - func(msBuilder mutableState, tBuilder *timerBuilder) ([]persistence.Task, error) { - if !msBuilder.IsWorkflowExecutionRunning() { - return nil, ErrWorkflowCompleted - } + return e.updateWorkflowExecutionWithAction(ctx, domainID, execution, func(msBuilder mutableState, tBuilder *timerBuilder) (*updateWorkflowAction, error) { + executionInfo := msBuilder.GetExecutionInfo() + createDecisionTask := true + // Do not create decision task when the workflow is cron and the cron has not been started yet + if msBuilder.GetExecutionInfo().CronSchedule != "" && !msBuilder.HasProcessedOrPendingDecisionTask() { + createDecisionTask = false + } + postActions := &updateWorkflowAction{ + deleteWorkflow: false, + createDecision: createDecisionTask, + timerTasks: nil, + } - executionInfo := msBuilder.GetExecutionInfo() - maxAllowedSignals := e.config.MaximumSignalsPerExecution(domainEntry.GetInfo().Name) - if maxAllowedSignals > 0 && int(executionInfo.SignalCount) >= maxAllowedSignals { - e.logger.Info("Execution limit reached for maximum signals", tag.WorkflowSignalCount(executionInfo.SignalCount), - tag.WorkflowID(execution.GetWorkflowId()), - tag.WorkflowRunID(execution.GetRunId()), - tag.WorkflowDomainID(domainID)) - return nil, ErrSignalsLimitExceeded - } + if !msBuilder.IsWorkflowExecutionRunning() { + return nil, ErrWorkflowCompleted + } - if childWorkflowOnly { - parentWorkflowID := executionInfo.ParentWorkflowID - parentRunID := executionInfo.ParentRunID - if parentExecution.GetWorkflowId() != parentWorkflowID || - parentExecution.GetRunId() != parentRunID { - return nil, ErrWorkflowParent - } - } + maxAllowedSignals := e.config.MaximumSignalsPerExecution(domainEntry.GetInfo().Name) + if maxAllowedSignals > 0 && int(executionInfo.SignalCount) >= maxAllowedSignals { + e.logger.Info("Execution limit reached for maximum signals", tag.WorkflowSignalCount(executionInfo.SignalCount), + tag.WorkflowID(execution.GetWorkflowId()), + tag.WorkflowRunID(execution.GetRunId()), + tag.WorkflowDomainID(domainID)) + return nil, ErrSignalsLimitExceeded + } - // deduplicate by request id for signal decision - if requestID := request.GetRequestId(); requestID != "" { - if msBuilder.IsSignalRequested(requestID) { - return nil, nil - } - msBuilder.AddSignalRequested(requestID) + if childWorkflowOnly { + parentWorkflowID := executionInfo.ParentWorkflowID + parentRunID := executionInfo.ParentRunID + if parentExecution.GetWorkflowId() != parentWorkflowID || + parentExecution.GetRunId() != parentRunID { + return nil, ErrWorkflowParent } + } - if msBuilder.AddWorkflowExecutionSignaled(request.GetSignalName(), request.GetInput(), request.GetIdentity()) == nil { - return nil, &workflow.InternalServiceError{Message: "Unable to signal workflow execution."} + // deduplicate by request id for signal decision + if requestID := request.GetRequestId(); requestID != "" { + if msBuilder.IsSignalRequested(requestID) { + return postActions, nil } + msBuilder.AddSignalRequested(requestID) + } - return nil, nil - }) + if msBuilder.AddWorkflowExecutionSignaled(request.GetSignalName(), request.GetInput(), request.GetIdentity()) == nil { + return nil, &workflow.InternalServiceError{Message: "Unable to signal workflow execution."} + } + + return postActions, nil + }) } func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx ctx.Context, signalWithStartRequest *h.SignalWithStartWorkflowExecutionRequest) ( diff --git a/service/history/mutableState.go b/service/history/mutableState.go index a8880562c63..3f038c47695 100644 --- a/service/history/mutableState.go +++ b/service/history/mutableState.go @@ -145,6 +145,7 @@ type ( HasInFlightDecisionTask() bool HasParentExecution() bool HasPendingDecisionTask() bool + HasProcessedOrPendingDecisionTask() bool IncrementHistorySize(int) IsCancelRequested() (bool, string) IsSignalRequested(requestID string) bool diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index af50fc98655..8a8deaf0503 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -1142,6 +1142,10 @@ func (e *mutableStateBuilder) HasPendingDecisionTask() bool { return e.executionInfo.DecisionScheduleID != common.EmptyEventID } +func (e *mutableStateBuilder) HasProcessedOrPendingDecisionTask() bool { + return e.HasPendingDecisionTask() || e.GetPreviousStartedEventID() != common.EmptyEventID +} + func (e *mutableStateBuilder) HasInFlightDecisionTask() bool { return e.executionInfo.DecisionStartedID > 0 } diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index d370acd8c29..fe70fc4bf13 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -573,8 +573,7 @@ Update_History_Loop: return nil } - if msBuilder.GetPreviousStartedEventID() != common.EmptyEventID || - msBuilder.HasPendingDecisionTask() { + if msBuilder.HasProcessedOrPendingDecisionTask() { // already has decision task return nil } diff --git a/service/history/timerQueueStandbyProcessor.go b/service/history/timerQueueStandbyProcessor.go index ca97fa3f236..fb22196f398 100644 --- a/service/history/timerQueueStandbyProcessor.go +++ b/service/history/timerQueueStandbyProcessor.go @@ -384,8 +384,7 @@ func (t *timerQueueStandbyProcessorImpl) processWorkflowBackoffTimer(timerTask * return t.processTimer(timerTask, func(context workflowExecutionContext, msBuilder mutableState) error { - if msBuilder.GetPreviousStartedEventID() != common.EmptyEventID || - msBuilder.HasPendingDecisionTask() { + if msBuilder.HasProcessedOrPendingDecisionTask() { // if there is one decision already been processed // or has pending decision, meaning workflow has already running return nil