Skip to content

Commit 8e121e2

Browse files
authored
expose is replaying flag (#419)
* expose is replay flag * adding more warnings
1 parent 2f5e5bf commit 8e121e2

File tree

5 files changed

+44
-0
lines changed

5 files changed

+44
-0
lines changed

internal/internal_event_handlers.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,10 @@ func (wc *workflowEnvironmentImpl) GetMetricsScope() tally.Scope {
315315
return wc.metricsScope
316316
}
317317

318+
func (wc *workflowEnvironmentImpl) IsReplaying() bool {
319+
return wc.isReplay
320+
}
321+
318322
func (wc *workflowEnvironmentImpl) GenerateSequenceID() string {
319323
return fmt.Sprintf("%d", wc.GenerateSequence())
320324
}

internal/internal_worker_base.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type (
6969
RegisterSignalHandler(handler func(name string, input []byte))
7070
SignalExternalWorkflow(domainName, workflowID, runID, signalName string, input []byte, arg interface{}, childWorkflowOnly bool, callback resultHandler)
7171
RegisterQueryHandler(handler func(queryType string, queryArgs []byte) ([]byte, error))
72+
IsReplaying() bool
7273
}
7374

7475
// WorkflowDefinition wraps the code that can execute a workflow.

internal/internal_workflow_testsuite.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,6 +1266,11 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelExternalWorkflow(domainName
12661266
}()
12671267
}
12681268

1269+
func (env *testWorkflowEnvironmentImpl) IsReplaying() bool {
1270+
// this test environment never replay
1271+
return false
1272+
}
1273+
12691274
func (env *testWorkflowEnvironmentImpl) SignalExternalWorkflow(domainName, workflowID, runID, signalName string, input []byte, arg interface{}, childWorkflowOnly bool, callback resultHandler) {
12701275
// check if target workflow is a known workflow
12711276
if childHandle, ok := env.runningWorkflows[workflowID]; ok {

internal/workflow.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -890,3 +890,20 @@ func SetQueryHandler(ctx Context, queryType string, handler interface{}) error {
890890
}
891891
return setQueryHandler(ctx, queryType, handler)
892892
}
893+
894+
// IsReplaying returns whether the current workflow code is replaying.
895+
//
896+
// Warning! Never make decisions, like schedule activity/childWorkflow/timer or send/wait on future/channel, based on
897+
// this flag as it is going to break workflow determinism requirement.
898+
// The only reasonable use case for this flag is to avoid some external actions during replay, like custom logging or
899+
// metric reporting. Please note that Cadence already provide standard logging/metric via workflow.GetLogger(ctx) and
900+
// workflow.GetMetricsScope(ctx), and those standard mechanism are replay-aware and it will automatically suppress during
901+
// replay. Only use this flag if you need custom logging/metrics reporting, for example if you want to log to kafka.
902+
//
903+
// Warning! Any action protected by this flag should not fail or if it does fail should ignore that failure or panic
904+
// on the failure. If workflow don't want to be blocked on those failure, it should ignore those failure; if workflow do
905+
// want to make sure it proceed only when that action succeed then it should panic on that failure. Panic raised from a
906+
// workflow causes decision task to fail and cadence server will rescheduled later to retry.
907+
func IsReplaying(ctx Context) bool {
908+
return getWorkflowEnvironment(ctx).IsReplaying()
909+
}

workflow/workflow.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,3 +367,20 @@ func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version
367367
func SetQueryHandler(ctx Context, queryType string, handler interface{}) error {
368368
return internal.SetQueryHandler(ctx, queryType, handler)
369369
}
370+
371+
// IsReplaying returns whether the current workflow code is replaying.
372+
//
373+
// Warning! Never make decisions, like schedule activity/childWorkflow/timer or send/wait on future/channel, based on
374+
// this flag as it is going to break workflow determinism requirement.
375+
// The only reasonable use case for this flag is to avoid some external actions during replay, like custom logging or
376+
// metric reporting. Please note that Cadence already provide standard logging/metric via workflow.GetLogger(ctx) and
377+
// workflow.GetMetricsScope(ctx), and those standard mechanism are replay-aware and it will automatically suppress during
378+
// replay. Only use this flag if you need custom logging/metrics reporting, for example if you want to log to kafka.
379+
//
380+
// Warning! Any action protected by this flag should not fail or if it does fail should ignore that failure or panic
381+
// on the failure. If workflow don't want to be blocked on those failure, it should ignore those failure; if workflow do
382+
// want to make sure it proceed only when that action succeed then it should panic on that failure. Panic raised from a
383+
// workflow causes decision task to fail and cadence server will rescheduled later to retry.
384+
func IsReplaying(ctx Context) bool {
385+
return internal.IsReplaying(ctx)
386+
}

0 commit comments

Comments
 (0)