Skip to content

Commit

Permalink
Fix bug: signal workflow will trigger cron workflow execution (#1838)
Browse files Browse the repository at this point in the history
* Fix bug: signal workflow should not trigger cron workflow execution.
#1595
  • Loading branch information
yux0 authored and longquanzheng committed May 14, 2019
1 parent 3f94b7b commit f7ad488
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 36 deletions.
1 change: 1 addition & 0 deletions common/persistence/versionHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (h *VersionHistories) AddHistory(item VersionHistoryItem, local VersionHist
return nil
}

// GetHistories returns the batch histories
func (h *VersionHistories) GetHistories() []VersionHistory {
return h.versionHistories
}
79 changes: 79 additions & 0 deletions host/signalworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,85 @@ CheckHistoryLoopForSignalSent:
s.Equal("history-service", *signalEvent.WorkflowExecutionSignaledEventAttributes.Identity)
}

func (s *integrationSuite) TestSignalWorkflow_Cron_NoDecisionTaskCreated() {
id := "integration-signal-workflow-test-cron"
wt := "integration-signal-workflow-test-cron-type"
tl := "integration-signal-workflow-test-cron-tasklist"
identity := "worker1"
cronSpec := "@every 2s"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

// Start workflow execution
request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
CronSchedule: &cronSpec,
}
now := time.Now()

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(*we.RunId))

// Send first signal using RunID
signalName := "my signal"
signalInput := []byte("my signal input.")
err := s.engine.SignalWorkflowExecution(createContext(), &workflow.SignalWorkflowExecutionRequest{
Domain: common.StringPtr(s.domainName),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(*we.RunId),
},
SignalName: common.StringPtr(signalName),
Input: signalInput,
Identity: common.StringPtr(identity),
})
s.Nil(err)

// decider logic
var decisionTaskDelay time.Duration
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
decisionTaskDelay = time.Now().Sub(now)

return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

poller := &TaskPoller{
Engine: s.engine,
Domain: s.domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
Logger: s.Logger,
T: s.T(),
}

// Make first decision to schedule activity
_, err = poller.PollAndProcessDecisionTask(false, false)
s.Logger.Info("PollAndProcessDecisionTask", tag.Error(err))
s.Nil(err)
s.True(decisionTaskDelay > time.Second*2)
}

func (s *integrationSuite) TestSignalExternalWorkflowDecision_WithoutRunID() {
id := "integration-signal-external-workflow-test-without-run-id"
wt := "integration-signal-external-workflow-test-without-run-id-type"
Expand Down
14 changes: 14 additions & 0 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,6 +1712,20 @@ func (_m *mockMutableState) HasPendingDecisionTask() bool {
return r0
}

// HasProcessedOrPendingDecisionTask provides a mock function with given fields:
func (_m *mockMutableState) HasProcessedOrPendingDecisionTask() bool {
ret := _m.Called()

var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}

return r0
}

// IncrementHistorySize provides a mock function with given fields: appendSize
func (_m *mockMutableState) IncrementHistorySize(appendSize int) {
_m.Called(appendSize)
Expand Down
74 changes: 42 additions & 32 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2102,45 +2102,55 @@ func (e *historyEngineImpl) SignalWorkflowExecution(ctx ctx.Context, signalReque
RunId: request.WorkflowExecution.RunId,
}

return e.updateWorkflowExecution(ctx, domainID, execution, false, true,
func(msBuilder mutableState, tBuilder *timerBuilder) ([]persistence.Task, error) {
if !msBuilder.IsWorkflowExecutionRunning() {
return nil, ErrWorkflowCompleted
}
return e.updateWorkflowExecutionWithAction(ctx, domainID, execution, func(msBuilder mutableState, tBuilder *timerBuilder) (*updateWorkflowAction, error) {
executionInfo := msBuilder.GetExecutionInfo()
createDecisionTask := true
// Do not create decision task when the workflow is cron and the cron has not been started yet
if msBuilder.GetExecutionInfo().CronSchedule != "" && !msBuilder.HasProcessedOrPendingDecisionTask() {
createDecisionTask = false
}
postActions := &updateWorkflowAction{
deleteWorkflow: false,
createDecision: createDecisionTask,
timerTasks: nil,
}

executionInfo := msBuilder.GetExecutionInfo()
maxAllowedSignals := e.config.MaximumSignalsPerExecution(domainEntry.GetInfo().Name)
if maxAllowedSignals > 0 && int(executionInfo.SignalCount) >= maxAllowedSignals {
e.logger.Info("Execution limit reached for maximum signals", tag.WorkflowSignalCount(executionInfo.SignalCount),
tag.WorkflowID(execution.GetWorkflowId()),
tag.WorkflowRunID(execution.GetRunId()),
tag.WorkflowDomainID(domainID))
return nil, ErrSignalsLimitExceeded
}
if !msBuilder.IsWorkflowExecutionRunning() {
return nil, ErrWorkflowCompleted
}

if childWorkflowOnly {
parentWorkflowID := executionInfo.ParentWorkflowID
parentRunID := executionInfo.ParentRunID
if parentExecution.GetWorkflowId() != parentWorkflowID ||
parentExecution.GetRunId() != parentRunID {
return nil, ErrWorkflowParent
}
}
maxAllowedSignals := e.config.MaximumSignalsPerExecution(domainEntry.GetInfo().Name)
if maxAllowedSignals > 0 && int(executionInfo.SignalCount) >= maxAllowedSignals {
e.logger.Info("Execution limit reached for maximum signals", tag.WorkflowSignalCount(executionInfo.SignalCount),
tag.WorkflowID(execution.GetWorkflowId()),
tag.WorkflowRunID(execution.GetRunId()),
tag.WorkflowDomainID(domainID))
return nil, ErrSignalsLimitExceeded
}

// deduplicate by request id for signal decision
if requestID := request.GetRequestId(); requestID != "" {
if msBuilder.IsSignalRequested(requestID) {
return nil, nil
}
msBuilder.AddSignalRequested(requestID)
if childWorkflowOnly {
parentWorkflowID := executionInfo.ParentWorkflowID
parentRunID := executionInfo.ParentRunID
if parentExecution.GetWorkflowId() != parentWorkflowID ||
parentExecution.GetRunId() != parentRunID {
return nil, ErrWorkflowParent
}
}

if msBuilder.AddWorkflowExecutionSignaled(request.GetSignalName(), request.GetInput(), request.GetIdentity()) == nil {
return nil, &workflow.InternalServiceError{Message: "Unable to signal workflow execution."}
// deduplicate by request id for signal decision
if requestID := request.GetRequestId(); requestID != "" {
if msBuilder.IsSignalRequested(requestID) {
return postActions, nil
}
msBuilder.AddSignalRequested(requestID)
}

return nil, nil
})
if msBuilder.AddWorkflowExecutionSignaled(request.GetSignalName(), request.GetInput(), request.GetIdentity()) == nil {
return nil, &workflow.InternalServiceError{Message: "Unable to signal workflow execution."}
}

return postActions, nil
})
}

func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx ctx.Context, signalWithStartRequest *h.SignalWithStartWorkflowExecutionRequest) (
Expand Down
1 change: 1 addition & 0 deletions service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ type (
HasInFlightDecisionTask() bool
HasParentExecution() bool
HasPendingDecisionTask() bool
HasProcessedOrPendingDecisionTask() bool
IncrementHistorySize(int)
IsCancelRequested() (bool, string)
IsSignalRequested(requestID string) bool
Expand Down
4 changes: 4 additions & 0 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,10 @@ func (e *mutableStateBuilder) HasPendingDecisionTask() bool {
return e.executionInfo.DecisionScheduleID != common.EmptyEventID
}

func (e *mutableStateBuilder) HasProcessedOrPendingDecisionTask() bool {
return e.HasPendingDecisionTask() || e.GetPreviousStartedEventID() != common.EmptyEventID
}

func (e *mutableStateBuilder) HasInFlightDecisionTask() bool {
return e.executionInfo.DecisionStartedID > 0
}
Expand Down
3 changes: 1 addition & 2 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,7 @@ Update_History_Loop:
return nil
}

if msBuilder.GetPreviousStartedEventID() != common.EmptyEventID ||
msBuilder.HasPendingDecisionTask() {
if msBuilder.HasProcessedOrPendingDecisionTask() {
// already has decision task
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,7 @@ func (t *timerQueueStandbyProcessorImpl) processWorkflowBackoffTimer(timerTask *

return t.processTimer(timerTask, func(context workflowExecutionContext, msBuilder mutableState) error {

if msBuilder.GetPreviousStartedEventID() != common.EmptyEventID ||
msBuilder.HasPendingDecisionTask() {
if msBuilder.HasProcessedOrPendingDecisionTask() {
// if there is one decision already been processed
// or has pending decision, meaning workflow has already running
return nil
Expand Down

0 comments on commit f7ad488

Please sign in to comment.