Skip to content

Commit

Permalink
Delete workflow execution on workflow time out. (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
sivakku authored Jun 30, 2017
1 parent 9971c83 commit 6d8ad64
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 24 deletions.
25 changes: 25 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2449,6 +2449,8 @@ func (s *integrationSuite) TestChildWorkflowWithContinueAsNew() {
}

func (s *integrationSuite) TestWorkflowTimeout() {
startTime := time.Now().UnixNano()

id := "integration-workflow-timeout-test"
wt := "integration-workflow-timeout-type"
tl := "integration-workflow-timeout-tasklist"
Expand Down Expand Up @@ -2505,6 +2507,29 @@ GetHistoryLoop:
break GetHistoryLoop
}
s.True(workflowComplete)

startFilter := workflow.NewStartTimeFilter()
startFilter.EarliestTime = common.Int64Ptr(startTime)
startFilter.LatestTime = common.Int64Ptr(time.Now().UnixNano())

closedCount := 0
ListClosedLoop:
for i := 0; i < 10; i++ {
resp, err3 := s.engine.ListClosedWorkflowExecutions(&workflow.ListClosedWorkflowExecutionsRequest{
Domain: common.StringPtr(s.domainName),
MaximumPageSize: common.Int32Ptr(100),
StartTimeFilter: startFilter,
})
s.Nil(err3)
closedCount = len(resp.Executions)
if closedCount == 0 {
s.logger.Info("Closed WorkflowExecution is not yet visibile")
time.Sleep(100 * time.Millisecond)
continue ListClosedLoop
}
break ListClosedLoop
}
s.Equal(1, closedCount)
}

func (s *integrationSuite) setupShards() {
Expand Down
53 changes: 36 additions & 17 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,21 +863,12 @@ Update_History_Loop:
}

if isComplete {
// Generate a transfer task to delete workflow execution
transferTasks = append(transferTasks, &persistence.DeleteExecutionTask{})

// Generate a timer task to cleanup history events for this workflow execution
var retentionInDays int32
_, domainConfig, err := e.domainCache.GetDomainByID(domainID)
tranT, timerT, err := e.getDeleteWorkflowTasks(domainID, context)
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
return err
}
} else {
retentionInDays = domainConfig.Retention
return err
}
cleanupTask := context.tBuilder.createDeleteHistoryEventTimerTask(time.Duration(retentionInDays) * time.Hour * 24)
timerTasks = append(timerTasks, cleanupTask)
transferTasks = append(transferTasks, tranT)
timerTasks = append(timerTasks, timerT)
}

// Generate a transaction ID for appending events to history
Expand Down Expand Up @@ -1406,14 +1397,19 @@ Update_History_Loop:
return err1
}

var transferTasks []persistence.Task
if err := action(msBuilder); err != nil {
return err
}

var transferTasks []persistence.Task
var timerTasks []persistence.Task
if createDeletionTask {
// Create a transfer task to delete workflow execution
transferTasks = append(transferTasks, &persistence.DeleteExecutionTask{})
tranT, timerT, err := e.getDeleteWorkflowTasks(domainID, context)
if err != nil {
return err
}
transferTasks = append(transferTasks, tranT)
timerTasks = append(timerTasks, timerT)
}

if createDecisionTask {
Expand All @@ -1436,7 +1432,7 @@ Update_History_Loop:

// We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload
// the history and try the operation again.
if err := context.updateWorkflowExecution(transferTasks, nil, transactionID); err != nil {
if err := context.updateWorkflowExecution(transferTasks, timerTasks, transactionID); err != nil {
if err == ErrConflict {
continue Update_History_Loop
}
Expand All @@ -1447,6 +1443,29 @@ Update_History_Loop:
return ErrMaxAttemptsExceeded
}

func (e *historyEngineImpl) getDeleteWorkflowTasks(
domainID string,
context *workflowExecutionContext,
) (persistence.Task, persistence.Task, error) {

// Create a transfer task to delete workflow execution
deleteTask := &persistence.DeleteExecutionTask{}

// Generate a timer task to cleanup history events for this workflow execution
var retentionInDays int32
_, domainConfig, err := e.domainCache.GetDomainByID(domainID)
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
return nil, nil, err
}
} else {
retentionInDays = domainConfig.Retention
}
cleanupTask := context.tBuilder.createDeleteHistoryEventTimerTask(time.Duration(retentionInDays) * time.Hour * 24)

return deleteTask, cleanupTask, nil
}

func (e *historyEngineImpl) createRecordDecisionTaskStartedResponse(domainID string, msBuilder *mutableStateBuilder,
startedEventID int64) *h.RecordDecisionTaskStartedResponse {
response := h.NewRecordDecisionTaskStartedResponse()
Expand Down
28 changes: 21 additions & 7 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ Update_History_Loop:

// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, timerTasks, nil)
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, false, timerTasks, nil)
if err != nil {
if err == ErrConflict {
continue Update_History_Loop
Expand Down Expand Up @@ -667,7 +667,7 @@ Update_History_Loop:
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
defer t.NotifyNewTimer(timerTasks)
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, timerTasks, nil)
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, false, timerTasks, nil)
if err != nil {
if err == ErrConflict {
continue Update_History_Loop
Expand Down Expand Up @@ -744,7 +744,7 @@ Update_History_Loop:
if scheduleNewDecision {
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, nil, nil)
err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, false, nil, nil)
if err != nil {
if err == ErrConflict {
continue Update_History_Loop
Expand Down Expand Up @@ -789,7 +789,7 @@ Update_History_Loop:

// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
err := t.updateWorkflowExecution(context, msBuilder, false, nil, nil)
err := t.updateWorkflowExecution(context, msBuilder, false, true, nil, nil)
if err != nil {
if err == ErrConflict {
continue Update_History_Loop
Expand All @@ -800,9 +800,14 @@ Update_History_Loop:
return ErrMaxAttemptsExceeded
}

func (t *timerQueueProcessorImpl) updateWorkflowExecution(context *workflowExecutionContext,
msBuilder *mutableStateBuilder, scheduleNewDecision bool, timerTasks []persistence.Task,
clearTimerTask persistence.Task) error {
func (t *timerQueueProcessorImpl) updateWorkflowExecution(
context *workflowExecutionContext,
msBuilder *mutableStateBuilder,
scheduleNewDecision bool,
createDeletionTask bool,
timerTasks []persistence.Task,
clearTimerTask persistence.Task,
) error {
var transferTasks []persistence.Task
if scheduleNewDecision {
// Schedule a new decision.
Expand All @@ -814,6 +819,15 @@ func (t *timerQueueProcessorImpl) updateWorkflowExecution(context *workflowExecu
}}
}

if createDeletionTask {
tranT, timerT, err := t.historyService.getDeleteWorkflowTasks(msBuilder.executionInfo.DomainID, context)
if err != nil {
return nil
}
transferTasks = append(transferTasks, tranT)
timerTasks = append(timerTasks, timerT)
}

// Generate a transaction ID for appending events to history
transactionID, err1 := t.historyService.shard.GetNextTransferTaskID()
if err1 != nil {
Expand Down
4 changes: 4 additions & 0 deletions service/history/timerQueueProcessor2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (s *timerQueueProcessor2Suite) SetupTest() {
s.mockShardManager = &mocks.ShardManager{}
s.mockHistoryMgr = &mocks.HistoryManager{}
s.mockVisibilityMgr = &mocks.VisibilityManager{}
s.mockMetadataMgr = &mocks.MetadataManager{}
s.shardClosedCh = make(chan int, 100)

s.mockShard = &shardContextImpl{
Expand Down Expand Up @@ -109,6 +110,7 @@ func (s *timerQueueProcessor2Suite) SetupTest() {
tokenSerializer: common.NewJSONTaskTokenSerializer(),
hSerializerFactory: persistence.NewHistorySerializerFactory(),
metricsClient: s.mockShard.GetMetricsClient(),
domainCache: domainCache,
}
h.timerProcessor = newTimerQueueProcessor(s.mockShard, h, s.mockExecutionMgr, s.logger)
s.mockHistoryEngine = h
Expand Down Expand Up @@ -219,6 +221,8 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
wfResponse := &persistence.GetWorkflowExecutionResponse{State: ms}
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(wfResponse, nil).Once()

s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&persistence.GetDomainResponse{Config: &persistence.DomainConfig{Retention: 1}}, nil).Once()
s.mockExecutionMgr.On("CompleteTimerTask", mock.Anything).Return(nil).Once()
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Run(func(arguments mock.Arguments) {
Expand Down

0 comments on commit 6d8ad64

Please sign in to comment.