Skip to content

Commit

Permalink
Separate transient decision task completion replication message to 2,…
Browse files Browse the repository at this point in the history
… guarantee the 1 to 1 mapping between event batch and replication message

Handle transient decision special case in mutable state
Add corresponding UT
  • Loading branch information
Wenquan Xing authored and longquanzheng committed May 14, 2019
1 parent 767dfbf commit b70a67c
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 105 deletions.
16 changes: 0 additions & 16 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,22 +933,6 @@ func (_m *mockMutableState) CreateNewHistoryEventWithTimestamp(eventType shared.
return r0
}

// CreateReplicationTask provides a mock function with given fields:
func (_m *mockMutableState) CreateReplicationTask(_a0 []persistence.Task, _a1 int32, _a2 []byte) []persistence.Task {
ret := _m.Called(_a0, _a1)

var r0 []persistence.Task
if rf, ok := ret.Get(0).(func(_a0 []persistence.Task, _a1 int32, _a2 []byte) []persistence.Task); ok {
r0 = rf(_a0, _a1, _a2)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]persistence.Task)
}
}

return r0
}

// CreateActivityRetryTimer provides a mock function with given fields: _a0, _a1
func (_m *mockMutableState) CreateActivityRetryTimer(_a0 *persistence.ActivityInfo, _a1 string) persistence.Task {
ret := _m.Called(_a0, _a1)
Expand Down
54 changes: 36 additions & 18 deletions service/history/MockWorkflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,36 @@ type mockWorkflowExecutionContext struct {

var _ workflowExecutionContext = (*mockWorkflowExecutionContext)(nil)

func (_m *mockWorkflowExecutionContext) appendFirstBatchEventsForActive(_a0 mutableState) (int, error) {
ret := _m.Called(_a0)
func (_m *mockWorkflowExecutionContext) appendFirstBatchEventsForActive(_a0 mutableState, _a1 bool) (int, persistence.Task, error) {
ret := _m.Called(_a0, _a1)

var r0 int
if rf, ok := ret.Get(0).(func(mutableState) int); ok {
r0 = rf(_a0)
if rf, ok := ret.Get(0).(func(mutableState, bool) int); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(int)
}

var r1 error
if rf, ok := ret.Get(1).(func(mutableState) error); ok {
r1 = rf(_a0)
var r1 persistence.Task
if rf, ok := ret.Get(1).(func(mutableState, bool) persistence.Task); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
if ret.Get(1) != nil {
r1 = ret.Get(1).(persistence.Task)
}
}

return r0, r1
var r2 error
if rf, ok := ret.Get(2).(func(mutableState, bool) error); ok {
r2 = rf(_a0, _a1)
} else {
r2 = ret.Error(2)
}

return r0, r1, r2
}

func (_m *mockWorkflowExecutionContext) appendFirstBatchEventsForStandby(_a0 mutableState, _a1 []*workflow.HistoryEvent) (int, error) {
func (_m *mockWorkflowExecutionContext) appendFirstBatchEventsForStandby(_a0 mutableState, _a1 []*workflow.HistoryEvent) (int, persistence.Task, error) {
ret := _m.Called(_a0, _a1)

var r0 int
Expand All @@ -69,14 +78,23 @@ func (_m *mockWorkflowExecutionContext) appendFirstBatchEventsForStandby(_a0 mut
r0 = ret.Get(0).(int)
}

var r1 error
if rf, ok := ret.Get(1).(func(mutableState, []*workflow.HistoryEvent) error); ok {
var r1 persistence.Task
if rf, ok := ret.Get(1).(func(mutableState, []*workflow.HistoryEvent) persistence.Task); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
if ret.Get(1) != nil {
r1 = ret.Get(1).(persistence.Task)
}
}

return r0, r1
var r2 error
if rf, ok := ret.Get(2).(func(mutableState, []*workflow.HistoryEvent) error); ok {
r2 = rf(_a0, _a1)
} else {
r2 = ret.Error(2)
}

return r0, r1, r2
}

func (_m *mockWorkflowExecutionContext) clear() {
Expand All @@ -96,13 +114,13 @@ func (_m *mockWorkflowExecutionContext) continueAsNewWorkflowExecution(_a0 []byt
return r0
}

func (_m *mockWorkflowExecutionContext) createWorkflowExecution(_a0 mutableState, _a1 string, _a2 bool, _a3 time.Time, _a4 []persistence.Task, _a5 []persistence.Task, _a6 int, _a7 string, _a8 int64) error {
func (_m *mockWorkflowExecutionContext) createWorkflowExecution(_a0 mutableState, _a1 string, _a2 bool, _a3 time.Time, _a4 []persistence.Task, _a5 []persistence.Task, _a6 []persistence.Task, _a7 int, _a8 string, _a9 int64) error {

ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8)
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8, _a9)

var r0 error
if rf, ok := ret.Get(0).(func(mutableState, string, bool, time.Time, []persistence.Task, []persistence.Task, int, string, int64) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8)
if rf, ok := ret.Get(0).(func(mutableState, string, bool, time.Time, []persistence.Task, []persistence.Task, []persistence.Task, int, string, int64) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8, _a9)
} else {
r0 = ret.Error(0)
}
Expand Down
20 changes: 15 additions & 5 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,15 @@ func (e *historyEngineImpl) StartWorkflowExecution(ctx ctx.Context, startRequest

context := newWorkflowExecutionContext(domainID, execution, e.shard, e.executionManager, e.logger)
createReplicationTask := domainEntry.CanReplicateEvent()
_, retError = context.appendFirstBatchEventsForActive(msBuilder)
replicationTasks := []persistence.Task{}
var replicationTask persistence.Task
_, replicationTask, retError = context.appendFirstBatchEventsForActive(msBuilder, createReplicationTask)
if retError != nil {
return
}
if replicationTask != nil {
replicationTasks = append(replicationTasks, replicationTask)
}

// delete history if createWorkflow failed, otherwise history will leak
shouldDeleteHistory := true
Expand All @@ -438,7 +443,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(ctx ctx.Context, startRequest
prevLastWriteVersion := int64(0)
retError = context.createWorkflowExecution(
msBuilder, e.currentClusterName, createReplicationTask, time.Now(),
transferTasks, timerTasks,
transferTasks, replicationTasks, timerTasks,
createMode, prevRunID, prevLastWriteVersion,
)
if retError != nil {
Expand Down Expand Up @@ -470,7 +475,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(ctx ctx.Context, startRequest
}
retError = context.createWorkflowExecution(
msBuilder, e.currentClusterName, createReplicationTask, time.Now(),
transferTasks, timerTasks,
transferTasks, replicationTasks, timerTasks,
createMode, prevRunID, prevLastWriteVersion,
)
}
Expand Down Expand Up @@ -2307,10 +2312,15 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx ctx.Context, si

context = newWorkflowExecutionContext(domainID, execution, e.shard, e.executionManager, e.logger)
createReplicationTask := domainEntry.CanReplicateEvent()
_, retError = context.appendFirstBatchEventsForActive(msBuilder)
replicationTasks := []persistence.Task{}
var replicationTask persistence.Task
_, replicationTask, retError = context.appendFirstBatchEventsForActive(msBuilder, createReplicationTask)
if retError != nil {
return
}
if replicationTask != nil {
replicationTasks = append(replicationTasks, replicationTask)
}

// delete history if createWorkflow failed, otherwise history will leak
shouldDeleteHistory := true
Expand All @@ -2330,7 +2340,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx ctx.Context, si
}
retError = context.createWorkflowExecution(
msBuilder, e.currentClusterName, createReplicationTask, time.Now(),
transferTasks, timerTasks,
transferTasks, replicationTasks, timerTasks,
createMode, prevRunID, prevLastWriteVersion,
)

Expand Down
9 changes: 5 additions & 4 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,14 +679,15 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx ctx.Context, context wo
executionInfo.SetLastFirstEventID(firstEvent.GetEventId())
executionInfo.SetNextEventID(lastEvent.GetEventId() + 1)

_, err := context.appendFirstBatchEventsForStandby(msBuilder, history.Events)
_, _, err := context.appendFirstBatchEventsForStandby(msBuilder, history.Events)
if err != nil {
return err
}

// workflow passive side logic should not generate any replication task
createReplicationTask := false
transferTasks := sBuilder.getTransferTasks()
var replicationTasks []persistence.Task // passive side generates no replication tasks
timerTasks := sBuilder.getTimerTasks()
now := time.Unix(0, lastEvent.GetTimestamp())

Expand All @@ -711,7 +712,7 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx ctx.Context, context wo
prevRunID := ""
prevLastWriteVersion := int64(0)
err = context.createWorkflowExecution(
msBuilder, sourceCluster, createReplicationTask, now, transferTasks, timerTasks,
msBuilder, sourceCluster, createReplicationTask, now, transferTasks, replicationTasks, timerTasks,
createMode, prevRunID, prevLastWriteVersion,
)
if err == nil {
Expand Down Expand Up @@ -744,7 +745,7 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx ctx.Context, context wo
prevRunID = currentRunID
prevLastWriteVersion = currentLastWriteVersion
return context.createWorkflowExecution(
msBuilder, sourceCluster, createReplicationTask, now, transferTasks, timerTasks,
msBuilder, sourceCluster, createReplicationTask, now, transferTasks, replicationTasks, timerTasks,
createMode, prevRunID, prevLastWriteVersion,
)
}
Expand Down Expand Up @@ -794,7 +795,7 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx ctx.Context, context wo
prevRunID = currentRunID
prevLastWriteVersion = incomingVersion
return context.createWorkflowExecution(
msBuilder, sourceCluster, createReplicationTask, now, transferTasks, timerTasks,
msBuilder, sourceCluster, createReplicationTask, now, transferTasks, replicationTasks, timerTasks,
createMode, prevRunID, prevLastWriteVersion,
)
}
Expand Down
9 changes: 5 additions & 4 deletions service/history/historyReplicatorV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,15 @@ func (r *historyReplicatorV2) applyStartEvents(ctx ctx.Context, context workflow
return err
}

_, err = context.appendFirstBatchEventsForStandby(msBuilder, task.getEvents())
_, _, err = context.appendFirstBatchEventsForStandby(msBuilder, task.getEvents())
if err != nil {
return err
}

// workflow passive side logic should not generate any replication task
createReplicationTask := false
transferTasks := sBuilder.getTransferTasks()
var replicationTasks []persistence.Task // passive side generates no replication tasks
timerTasks := sBuilder.getTimerTasks()
defer func() {
if retError == nil {
Expand All @@ -209,7 +210,7 @@ func (r *historyReplicatorV2) applyStartEvents(ctx ctx.Context, context workflow
prevRunID := ""
prevLastWriteVersion := int64(0)
err = context.createWorkflowExecution(
msBuilder, task.getSourceCluster(), createReplicationTask, task.getEventTime(), transferTasks, timerTasks,
msBuilder, task.getSourceCluster(), createReplicationTask, task.getEventTime(), transferTasks, replicationTasks, timerTasks,
createMode, prevRunID, prevLastWriteVersion,
)
if err == nil {
Expand Down Expand Up @@ -238,7 +239,7 @@ func (r *historyReplicatorV2) applyStartEvents(ctx ctx.Context, context workflow
prevRunID = currentRunID
prevLastWriteVersion = currentLastWriteVersion
return context.createWorkflowExecution(
msBuilder, task.getSourceCluster(), createReplicationTask, task.getEventTime(), transferTasks, timerTasks,
msBuilder, task.getSourceCluster(), createReplicationTask, task.getEventTime(), transferTasks, replicationTasks, timerTasks,
createMode, prevRunID, prevLastWriteVersion,
)
}
Expand Down Expand Up @@ -294,7 +295,7 @@ func (r *historyReplicatorV2) applyStartEvents(ctx ctx.Context, context workflow
prevRunID = currentRunID
prevLastWriteVersion = task.getVersion()
return context.createWorkflowExecution(
msBuilder, task.getSourceCluster(), createReplicationTask, task.getEventTime(), transferTasks, timerTasks,
msBuilder, task.getSourceCluster(), createReplicationTask, task.getEventTime(), transferTasks, replicationTasks, timerTasks,
createMode, prevRunID, prevLastWriteVersion,
)
}
Expand Down
1 change: 0 additions & 1 deletion service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ type (
CreateActivityRetryTimer(*persistence.ActivityInfo, string) persistence.Task
CreateNewHistoryEvent(eventType workflow.EventType) *workflow.HistoryEvent
CreateNewHistoryEventWithTimestamp(eventType workflow.EventType, timestamp int64) *workflow.HistoryEvent
CreateReplicationTask([]persistence.Task, int32, []byte) []persistence.Task
CreateTransientDecisionEvents(di *decisionInfo, identity string) (*workflow.HistoryEvent, *workflow.HistoryEvent)
DeleteActivity(int64) error
DeleteBufferedReplicationTask(int64)
Expand Down
26 changes: 9 additions & 17 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,23 +499,6 @@ func (e *mutableStateBuilder) DeleteBufferedReplicationTask(firstEventID int64)
e.deleteBufferedReplicationEvent = common.Int64Ptr(firstEventID)
}

func (e *mutableStateBuilder) CreateReplicationTask(replicationTask []persistence.Task, newRunEventStoreVersion int32, newRunBranchToken []byte) []persistence.Task {
if e.replicationState == nil {
return replicationTask
}

return append(replicationTask, &persistence.HistoryReplicationTask{
FirstEventID: e.GetLastFirstEventID(),
NextEventID: e.GetNextEventID(),
Version: e.replicationState.CurrentVersion,
LastReplicationInfo: e.replicationState.LastReplicationInfo,
EventStoreVersion: e.GetEventStoreVersion(),
BranchToken: e.GetCurrentBranch(),
NewRunEventStoreVersion: newRunEventStoreVersion,
NewRunBranchToken: newRunBranchToken,
})
}

func (e *mutableStateBuilder) checkAndClearTimerFiredEvent(timerID string) *workflow.HistoryEvent {
var timerEvent *workflow.HistoryEvent
e.bufferedEvents, timerEvent = checkAndClearTimerFiredEvent(e.bufferedEvents, timerID)
Expand Down Expand Up @@ -1590,6 +1573,15 @@ func (e *mutableStateBuilder) ReplicateDecisionTaskStartedEvent(di *decisionInfo
// does not have to deal with transient decision case.
if di == nil {
di, _ = e.GetPendingDecision(scheduleID)
// setting decision attempt to 0 for decision task replication
// this mainly handles transient decision completion
// for transient decision, active side will write 2 batch in a "transaction"
// 1. decision task scheduled & decision task started
// 2. decision task completed & other events
// since we need to treat each individual event batch as one transaction
// certain "magic" needs to be done, i.e. setting attempt to 0 so
// if first batch is replicated, but not the second one, decision can be correctly timed out
di.Attempt = 0
}

e.executionInfo.State = persistence.WorkflowStateRunning
Expand Down
Loading

0 comments on commit b70a67c

Please sign in to comment.