From 02c7efbed448c4a493b3a971e8e0e292e17c6d91 Mon Sep 17 00:00:00 2001 From: Zijian Date: Thu, 16 May 2024 20:12:21 -0700 Subject: [PATCH] Revert "Update task executor to handle WorkflowAlreadyCompletedError for signal and cancel workflow (#5956)" (#6026) This reverts commit d877674d86e9c3fbcae661f0cbc73a2d82b8929f. --- .../cross_cluster_source_task_executor.go | 10 ----- ...cross_cluster_source_task_executor_test.go | 2 +- .../task/transfer_active_task_executor.go | 22 ++------- .../transfer_active_task_executor_test.go | 45 +------------------ 4 files changed, 6 insertions(+), 73 deletions(-) diff --git a/service/history/task/cross_cluster_source_task_executor.go b/service/history/task/cross_cluster_source_task_executor.go index 059a9725411..b1c2dba7505 100644 --- a/service/history/task/cross_cluster_source_task_executor.go +++ b/service/history/task/cross_cluster_source_task_executor.go @@ -257,10 +257,6 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask( if failedCause != nil { // remaining errors are non-retryable - cause := types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution - if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted { - cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted - } return requestCancelExternalExecutionFailed( ctx, taskInfo, @@ -269,7 +265,6 @@ func (t *crossClusterSourceTaskExecutor) executeCancelExecutionTask( taskInfo.TargetWorkflowID, taskInfo.TargetRunID, now, - cause, ) } return requestCancelExternalExecutionCompleted( @@ -484,10 +479,6 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask( if failedCause != nil { // remaining errors are non-retryable - cause := types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution - if *failedCause == types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted { - cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted - } return signalExternalExecutionFailed( ctx, taskInfo, @@ -497,7 +488,6 @@ func (t *crossClusterSourceTaskExecutor) executeSignalExecutionTask( taskInfo.TargetRunID, signalInfo.Control, now, - cause, ) } diff --git a/service/history/task/cross_cluster_source_task_executor_test.go b/service/history/task/cross_cluster_source_task_executor_test.go index 78b7afa01b9..916e962d9ee 100644 --- a/service/history/task/cross_cluster_source_task_executor_test.go +++ b/service/history/task/cross_cluster_source_task_executor_test.go @@ -664,7 +664,7 @@ func (s *crossClusterSourceTaskExecutorSuite) TestExecuteCancelExecution_Failure &types.CrossClusterTaskResponse{ TaskType: types.CrossClusterTaskTypeCancelExecution.Ptr(), TaskState: int16(processingStateInitialized), - FailedCause: types.CrossClusterTaskFailedCauseWorkflowAlreadyCompleted.Ptr(), + FailedCause: types.CrossClusterTaskFailedCauseWorkflowNotExists.Ptr(), }, func( mutableState execution.MutableState, diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index f5babe3a49f..802bbd646c6 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -630,7 +630,6 @@ func (t *transferActiveTaskExecutor) processCancelExecution( task.TargetWorkflowID, task.TargetRunID, t.shard.GetTimeSource().Now(), - types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution, ) return err } @@ -657,11 +656,6 @@ func (t *transferActiveTaskExecutor) processCancelExecution( // for retryable error just return return err } - cause := types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution - var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError - if errors.As(err, &alreadyCompletedErr) { - cause = types.CancelExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted - } return requestCancelExternalExecutionFailed( ctx, task, @@ -670,7 +664,6 @@ func (t *transferActiveTaskExecutor) processCancelExecution( task.TargetWorkflowID, task.TargetRunID, t.shard.GetTimeSource().Now(), - cause, ) } @@ -757,7 +750,6 @@ func (t *transferActiveTaskExecutor) processSignalExecution( task.TargetRunID, signalInfo.Control, t.shard.GetTimeSource().Now(), - types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution, ) } @@ -777,17 +769,12 @@ func (t *transferActiveTaskExecutor) processSignalExecution( tag.TargetWorkflowRunID(task.TargetRunID), tag.Error(err)) - // Check to see if the error is non-transient, in which case add RequestCancelFailed + // Check to see if the error is non-transient, in which case add SignalFailed // event and complete transfer task by setting the err = nil if common.IsServiceTransientError(err) || common.IsContextTimeoutError(err) { // for retryable error just return return err } - var alreadyCompletedErr *types.WorkflowExecutionAlreadyCompletedError - cause := types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution - if errors.As(err, &alreadyCompletedErr) { - cause = types.SignalExternalWorkflowExecutionFailedCauseWorkflowAlreadyCompleted - } return signalExternalExecutionFailed( ctx, task, @@ -797,7 +784,6 @@ func (t *transferActiveTaskExecutor) processSignalExecution( task.TargetRunID, signalInfo.Control, t.shard.GetTimeSource().Now(), - cause, ) } @@ -1433,7 +1419,6 @@ func requestCancelExternalExecutionFailed( targetWorkflowID string, targetRunID string, now time.Time, - cause types.CancelExternalWorkflowExecutionFailedCause, ) error { err := updateWorkflowExecution(ctx, wfContext, true, @@ -1454,7 +1439,7 @@ func requestCancelExternalExecutionFailed( targetDomain, targetWorkflowID, targetRunID, - cause, + types.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution, ) return err }, @@ -1479,7 +1464,6 @@ func signalExternalExecutionFailed( targetRunID string, control []byte, now time.Time, - cause types.SignalExternalWorkflowExecutionFailedCause, ) error { err := updateWorkflowExecution(ctx, wfContext, true, @@ -1501,7 +1485,7 @@ func signalExternalExecutionFailed( targetWorkflowID, targetRunID, control, - cause, + types.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution, ) return err }, diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index c4f81196eb1..3d8e54a3fb6 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -1046,7 +1046,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Success() { ) } -func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_EntityNotExistsError() { +func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Failure() { s.testProcessCancelExecution( s.targetDomainID, func( @@ -1067,27 +1067,6 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_EntityNotEx ) } -func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_WorkflowAlreadyCompleted() { - s.testProcessCancelExecution( - s.targetDomainID, - func( - mutableState execution.MutableState, - workflowExecution, targetExecution types.WorkflowExecution, - event *types.HistoryEvent, - transferTask Task, - requestCancelInfo *persistence.RequestCancelInfo, - ) { - persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version) - s.NoError(err) - s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - cancelRequest := createTestRequestCancelWorkflowExecutionRequest(s.targetDomainName, transferTask.GetInfo().(*persistence.TransferTaskInfo), requestCancelInfo.CancelRequestID) - s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), cancelRequest).Return(&types.WorkflowExecutionAlreadyCompletedError{}).Times(1) - s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once() - s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once() - }, - ) -} - func (s *transferActiveTaskExecutorSuite) TestProcessCancelExecution_Duplication() { s.testProcessCancelExecution( s.targetDomainID, @@ -1223,7 +1202,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Success() { ) } -func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_EntityNotExistsError() { +func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Failure() { s.testProcessSignalExecution( s.targetDomainID, func( @@ -1244,26 +1223,6 @@ func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_EntityNotEx ) } -func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_WorkflowAlreadyCompletedError() { - s.testProcessSignalExecution( - s.targetDomainID, - func( - mutableState execution.MutableState, - workflowExecution, targetExecution types.WorkflowExecution, - event *types.HistoryEvent, - transferTask Task, - signalInfo *persistence.SignalInfo, - ) { - persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version) - s.NoError(err) - s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - signalRequest := createTestSignalWorkflowExecutionRequest(s.targetDomainName, transferTask.GetInfo().(*persistence.TransferTaskInfo), signalInfo) - s.mockHistoryClient.EXPECT().SignalWorkflowExecution(gomock.Any(), signalRequest).Return(&types.WorkflowExecutionAlreadyCompletedError{}).Times(1) - s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&persistence.AppendHistoryNodesResponse{}, nil).Once() - s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once() - }, - ) -} func (s *transferActiveTaskExecutorSuite) TestProcessSignalExecution_Duplication() { s.testProcessSignalExecution( s.targetDomainID,