Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2756,6 +2756,18 @@ that task will be sent to DLQ.`,
false,
`If true, validate the start time of the old workflow is older than WorkflowIdReuseMinimalInterval when reusing workflow ID.`,
)
WorkflowIDReuseRate = NewNamespaceIntSetting(
"history.workflowIDReuseRate",
0,
`WorkflowIDReuseRate limits the rate of new workflow execution creation per
(namespace, workflowID) pair on a single history host. 0 = disabled (default).`,
)
WorkflowIDReuseBurstRatio = NewNamespaceFloatSetting(
"history.workflowIDReuseBurstRatio",
1.0,
`WorkflowIDReuseBurstRatio is the burst-to-rate ratio for the per-(namespace, workflowID)
start rate limiter. Burst = max(1, int(rps * ratio)). Default 1.0 (no burst above rate).`,
)
HealthPersistenceLatencyFailure = NewGlobalFloatSetting(
"history.healthPersistenceLatencyFailure",
500,
Expand Down
5 changes: 5 additions & 0 deletions service/history/api/resetworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
historyi "go.temporal.io/server/service/history/interfaces"
"go.temporal.io/server/service/history/ndc"
)
Expand All @@ -40,6 +41,10 @@ func Invoke(

request := resetRequest.ResetRequest
workflowID := request.WorkflowExecution.GetWorkflowId()

if rl := shardContext.WorkflowIDReuseRL(namespaceID, workflowID); rl != nil && !rl.Allow() {
return nil, consts.ErrWorkflowIDRateLimitExceeded
}
baseRunID := request.WorkflowExecution.GetRunId()

baseWorkflowLease, err := workflowConsistencyChecker.GetWorkflowLease(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func startAndSignalWithoutCurrentWorkflow(
newWorkflowLease.GetMutableState(),
newWorkflow,
newWorkflowEventsSeq,
historyi.TransactionPolicyActive,
)
switch failedErr := err.(type) {
case nil:
Expand Down
2 changes: 2 additions & 0 deletions service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func (s *Starter) createBrandNew(ctx context.Context, creationParams *creationPa
creationParams.workflowLease.GetMutableState(),
creationParams.workflowSnapshot,
creationParams.workflowEventBatches,
historyi.TransactionPolicyActive,
)
}

Expand Down Expand Up @@ -394,6 +395,7 @@ func (s *Starter) createAsCurrent(
creationParams.workflowLease.GetMutableState(),
creationParams.workflowSnapshot,
creationParams.workflowEventBatches,
historyi.TransactionPolicyActive,
)
}

Expand Down
2 changes: 2 additions & 0 deletions service/history/chasm_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ func (e *ChasmEngine) persistAsBrandNew(
newExecutionParams.mutableState,
newExecutionParams.snapshot,
newExecutionParams.events,
historyi.TransactionPolicyActive,
)
if err == nil {
return currentExecutionInfo{}, false, nil
Expand Down Expand Up @@ -1047,6 +1048,7 @@ func (e *ChasmEngine) handleReusePolicy(
newExecutionParams.mutableState,
newExecutionParams.snapshot,
newExecutionParams.events,
historyi.TransactionPolicyActive,
)
if err != nil {
return chasm.StartExecutionResult{}, err
Expand Down
4 changes: 4 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ type Config struct {

WorkflowIdReuseMinimalInterval dynamicconfig.DurationPropertyFnWithNamespaceFilter
EnableWorkflowIdReuseStartTimeValidation dynamicconfig.BoolPropertyFnWithNamespaceFilter
WorkflowIDReuseRate dynamicconfig.IntPropertyFnWithNamespaceFilter
WorkflowIDReuseBurstRatio dynamicconfig.FloatPropertyFnWithNamespaceFilter

HealthPersistenceLatencyFailure dynamicconfig.FloatPropertyFn
HealthPersistenceErrorRatio dynamicconfig.FloatPropertyFn
Expand Down Expand Up @@ -771,6 +773,8 @@ func NewConfig(
SendRawWorkflowHistory: dynamicconfig.SendRawWorkflowHistory.Get(dc),
WorkflowIdReuseMinimalInterval: dynamicconfig.WorkflowIdReuseMinimalInterval.Get(dc),
EnableWorkflowIdReuseStartTimeValidation: dynamicconfig.EnableWorkflowIdReuseStartTimeValidation.Get(dc),
WorkflowIDReuseRate: dynamicconfig.WorkflowIDReuseRate.Get(dc),
WorkflowIDReuseBurstRatio: dynamicconfig.WorkflowIDReuseBurstRatio.Get(dc),

HealthPersistenceLatencyFailure: dynamicconfig.HealthPersistenceLatencyFailure.Get(dc),
HealthPersistenceErrorRatio: dynamicconfig.HealthPersistenceErrorRatio.Get(dc),
Expand Down
6 changes: 6 additions & 0 deletions service/history/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ var (

// ErrResetRedirectLimitReached indicates a possible long chain (or a loop) of resets that cannot be handled.
ErrResetRedirectLimitReached = serviceerror.NewInternal("The chain of resets is too long to iterate.")
// ErrWorkflowIDRateLimitExceeded is returned when the per-(namespace, workflowID) start rate limit is exceeded.
ErrWorkflowIDRateLimitExceeded = &serviceerror.ResourceExhausted{
Cause: enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT,
Scope: enumspb.RESOURCE_EXHAUSTED_SCOPE_NAMESPACE,
Message: "workflow ID start rate limit exceeded",
}
)

// StaleStateError is an indicator that after loading the state for a task it was detected as stale. It's possible that
Expand Down
3 changes: 3 additions & 0 deletions service/history/interfaces/shard_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/pingable"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/history/configs"
Expand Down Expand Up @@ -106,6 +107,8 @@ type (
GetFinalizer() *finalizer.Finalizer

ChasmRegistry() *chasm.Registry

WorkflowIDReuseRL(namespaceID namespace.ID, workflowID string) quotas.RateLimiter
}

// A ControllableContext is a Context plus other methods needed by
Expand Down
29 changes: 29 additions & 0 deletions service/history/interfaces/shard_context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions service/history/interfaces/workflow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type (
newMutableState MutableState,
newWorkflow *persistence.WorkflowSnapshot,
newWorkflowEvents []*persistence.WorkflowEvents,
transactionPolicy TransactionPolicy,
) error
ConflictResolveWorkflowExecution(
ctx context.Context,
Expand Down
8 changes: 4 additions & 4 deletions service/history/interfaces/workflow_context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions service/history/ndc/transaction_manager_new_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsCurrent(
targetWorkflow.GetMutableState(),
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
historyi.TransactionPolicyPassive,
)
}

Expand All @@ -180,6 +181,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsCurrent(
targetWorkflow.GetMutableState(),
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
historyi.TransactionPolicyPassive,
)
}

Expand Down Expand Up @@ -254,6 +256,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
ms,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
historyi.TransactionPolicyPassive,
)
switch err.(type) {
case nil:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_BrandNew(
mutableState,
workflowSnapshot,
workflowEventsSeq,
gomock.Any(),
).Return(nil)

err := s.createMgr.dispatchForNewWorkflow(ctx, chasm.WorkflowArchetypeID, newWorkflow)
Expand Down Expand Up @@ -195,6 +196,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsC
targetMutableState,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
gomock.Any(),
).Return(nil)

err := s.createMgr.dispatchForNewWorkflow(ctx, chasm.WorkflowArchetypeID, targetWorkflow)
Expand Down Expand Up @@ -264,6 +266,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
gomock.Any(),
).Return(nil)
targetContext.EXPECT().ReapplyEvents(gomock.Any(), s.mockShard, targetWorkflowEventsSeq).Return(nil)

Expand Down Expand Up @@ -343,6 +346,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
gomock.Any(),
).Return(nil)
targetContext.EXPECT().ReapplyEvents(gomock.Any(), s.mockShard, eventsToApply).Return(nil)

Expand Down Expand Up @@ -413,6 +417,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
gomock.Any(),
).Return(&persistence.WorkflowConditionFailedError{})
targetContext.EXPECT().ReapplyEvents(gomock.Any(), s.mockShard, targetWorkflowEventsSeq).Return(nil)

Expand Down
2 changes: 2 additions & 0 deletions service/history/ndc/workflow_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (s *workflowReplicatorSuite) Test_ApplyWorkflowState_BrandNew() {
gomock.Any(),
gomock.Any(),
[]*persistence.WorkflowEvents{},
gomock.Any(),
).Return(nil)
s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(namespace.NewNamespaceForTest(
&persistencespb.NamespaceInfo{Name: namespaceName},
Expand Down Expand Up @@ -307,6 +308,7 @@ func (s *workflowReplicatorSuite) Test_ApplyWorkflowState_Ancestors() {
gomock.Any(),
gomock.Any(),
[]*persistence.WorkflowEvents{},
gomock.Any(),
).Return(nil)
s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(namespace.NewNamespaceForTest(
&persistencespb.NamespaceInfo{Name: namespaceName},
Expand Down
31 changes: 31 additions & 0 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/quotas"
cclock "go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/config"
Expand Down Expand Up @@ -72,6 +74,9 @@ const (
queueMetricUpdateInterval = 5 * time.Minute

pendingMaxReplicationTaskID = math.MaxInt64

workflowIDRateLimiterCacheSize = 10000
workflowIDRateLimiterCacheTTL = 60 * time.Second
)

var (
Expand Down Expand Up @@ -149,6 +154,8 @@ type (
stateMachineRegistry *hsm.Registry

chasmRegistry *chasm.Registry

workflowIDRateLimiters cache.StoppableCache
}

remoteClusterInfo struct {
Expand Down Expand Up @@ -2126,6 +2133,7 @@ func newContext(
ioSemaphore: locks.NewPrioritySemaphore(ioConcurrency),
stateMachineRegistry: stateMachineRegistry,
chasmRegistry: chasmRegistry,
workflowIDRateLimiters: cache.New(workflowIDRateLimiterCacheSize, &cache.Options{TTL: workflowIDRateLimiterCacheTTL}),
}
shardContext.taskKeyManager = newTaskKeyManager(
shardContext.taskCategoryRegistry,
Expand Down Expand Up @@ -2232,6 +2240,29 @@ func (s *ContextImpl) ChasmRegistry() *chasm.Registry {
return s.chasmRegistry
}

func (s *ContextImpl) WorkflowIDReuseRL(namespaceID namespace.ID, workflowID string) quotas.RateLimiter {
rps := s.config.WorkflowIDReuseRate(namespaceID.String())
if rps <= 0 {
return nil
}
burst := max(1, int(float64(rps)*s.config.WorkflowIDReuseBurstRatio(namespaceID.String())))
key := namespaceID.String() + "/" + workflowID
existing := s.workflowIDRateLimiters.Get(key)
if existing == nil {
rl := quotas.NewRateLimiter(float64(rps), burst)
existing, _ = s.workflowIDRateLimiters.PutIfNotExist(key, rl)
}
rl, ok := existing.(*quotas.RateLimiterImpl)
if !ok {
// Should never happen; cache only stores *RateLimiterImpl.
return nil
}
if float64(rps) != rl.Rate() || burst != rl.Burst() {
rl.SetRateBurst(float64(rps), burst)
}
return rl
}

func (s *ContextImpl) GetCachedWorkflowContext(
ctx context.Context,
namespaceID namespace.ID,
Expand Down
2 changes: 2 additions & 0 deletions service/history/shard/context_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/future"
Expand Down Expand Up @@ -150,6 +151,7 @@ func newTestContext(t *resourcetest.Test, eventsCache events.Cache, config Conte
namespaceRegistry: registry,
stateMachineRegistry: hsm.NewRegistry(),
chasmRegistry: chasm.NewRegistry(t.GetLogger()),
workflowIDRateLimiters: cache.New(workflowIDRateLimiterCacheSize, &cache.Options{TTL: workflowIDRateLimiterCacheTTL}),
persistenceShardManager: t.GetShardManager(),
clientBean: t.GetClientBean(),
saProvider: t.GetSearchAttributesProvider(),
Expand Down
Loading
Loading