diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 1bf78babd8..858dfa7a1b 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2756,6 +2756,18 @@ that task will be sent to DLQ.`, false, `If true, validate the start time of the old workflow is older than WorkflowIdReuseMinimalInterval when reusing workflow ID.`, ) + WorkflowIDReuseRate = NewNamespaceIntSetting( + "history.workflowIDReuseRate", + 0, + `WorkflowIDReuseRate limits the rate of new workflow execution creation per +(namespace, workflowID) pair on a single history host. 0 = disabled (default).`, + ) + WorkflowIDReuseBurstRatio = NewNamespaceFloatSetting( + "history.workflowIDReuseBurstRatio", + 1.0, + `WorkflowIDReuseBurstRatio is the burst-to-rate ratio for the per-(namespace, workflowID) +start rate limiter. Burst = max(1, int(rps * ratio)). Default 1.0 (no burst above rate).`, + ) HealthPersistenceLatencyFailure = NewGlobalFloatSetting( "history.healthPersistenceLatencyFailure", 500, diff --git a/service/history/api/resetworkflow/api.go b/service/history/api/resetworkflow/api.go index 42ad357911..eb22d81160 100644 --- a/service/history/api/resetworkflow/api.go +++ b/service/history/api/resetworkflow/api.go @@ -18,6 +18,7 @@ import ( "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/consts" historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/ndc" ) @@ -40,6 +41,10 @@ func Invoke( request := resetRequest.ResetRequest workflowID := request.WorkflowExecution.GetWorkflowId() + + if rl := shardContext.WorkflowIDReuseRL(namespaceID, workflowID); rl != nil && !rl.Allow() { + return nil, consts.ErrWorkflowIDRateLimitExceeded + } baseRunID := request.WorkflowExecution.GetRunId() baseWorkflowLease, err := workflowConsistencyChecker.GetWorkflowLease( diff --git a/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go b/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go index 3a4324dbf2..0510635ae5 100644 --- a/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go +++ b/service/history/api/signalwithstartworkflow/signal_with_start_workflow.go @@ -256,6 +256,7 @@ func startAndSignalWithoutCurrentWorkflow( newWorkflowLease.GetMutableState(), newWorkflow, newWorkflowEventsSeq, + historyi.TransactionPolicyActive, ) switch failedErr := err.(type) { case nil: diff --git a/service/history/api/startworkflow/api.go b/service/history/api/startworkflow/api.go index 030c9e36ab..f6f53af5c9 100644 --- a/service/history/api/startworkflow/api.go +++ b/service/history/api/startworkflow/api.go @@ -319,6 +319,7 @@ func (s *Starter) createBrandNew(ctx context.Context, creationParams *creationPa creationParams.workflowLease.GetMutableState(), creationParams.workflowSnapshot, creationParams.workflowEventBatches, + historyi.TransactionPolicyActive, ) } @@ -394,6 +395,7 @@ func (s *Starter) createAsCurrent( creationParams.workflowLease.GetMutableState(), creationParams.workflowSnapshot, creationParams.workflowEventBatches, + historyi.TransactionPolicyActive, ) } diff --git a/service/history/chasm_engine.go b/service/history/chasm_engine.go index 77cac2d219..6462146769 100644 --- a/service/history/chasm_engine.go +++ b/service/history/chasm_engine.go @@ -862,6 +862,7 @@ func (e *ChasmEngine) persistAsBrandNew( newExecutionParams.mutableState, newExecutionParams.snapshot, newExecutionParams.events, + historyi.TransactionPolicyActive, ) if err == nil { return currentExecutionInfo{}, false, nil @@ -1047,6 +1048,7 @@ func (e *ChasmEngine) handleReusePolicy( newExecutionParams.mutableState, newExecutionParams.snapshot, newExecutionParams.events, + historyi.TransactionPolicyActive, ) if err != nil { return chasm.StartExecutionResult{}, err diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 5332fcb23f..4d9987d30f 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -395,6 +395,8 @@ type Config struct { WorkflowIdReuseMinimalInterval dynamicconfig.DurationPropertyFnWithNamespaceFilter EnableWorkflowIdReuseStartTimeValidation dynamicconfig.BoolPropertyFnWithNamespaceFilter + WorkflowIDReuseRate dynamicconfig.IntPropertyFnWithNamespaceFilter + WorkflowIDReuseBurstRatio dynamicconfig.FloatPropertyFnWithNamespaceFilter HealthPersistenceLatencyFailure dynamicconfig.FloatPropertyFn HealthPersistenceErrorRatio dynamicconfig.FloatPropertyFn @@ -771,6 +773,8 @@ func NewConfig( SendRawWorkflowHistory: dynamicconfig.SendRawWorkflowHistory.Get(dc), WorkflowIdReuseMinimalInterval: dynamicconfig.WorkflowIdReuseMinimalInterval.Get(dc), EnableWorkflowIdReuseStartTimeValidation: dynamicconfig.EnableWorkflowIdReuseStartTimeValidation.Get(dc), + WorkflowIDReuseRate: dynamicconfig.WorkflowIDReuseRate.Get(dc), + WorkflowIDReuseBurstRatio: dynamicconfig.WorkflowIDReuseBurstRatio.Get(dc), HealthPersistenceLatencyFailure: dynamicconfig.HealthPersistenceLatencyFailure.Get(dc), HealthPersistenceErrorRatio: dynamicconfig.HealthPersistenceErrorRatio.Get(dc), diff --git a/service/history/consts/const.go b/service/history/consts/const.go index 7c74276ab7..1c9ac7a9fe 100644 --- a/service/history/consts/const.go +++ b/service/history/consts/const.go @@ -137,6 +137,12 @@ var ( // ErrResetRedirectLimitReached indicates a possible long chain (or a loop) of resets that cannot be handled. ErrResetRedirectLimitReached = serviceerror.NewInternal("The chain of resets is too long to iterate.") + // ErrWorkflowIDRateLimitExceeded is returned when the per-(namespace, workflowID) start rate limit is exceeded. + ErrWorkflowIDRateLimitExceeded = &serviceerror.ResourceExhausted{ + Cause: enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, + Scope: enumspb.RESOURCE_EXHAUSTED_SCOPE_NAMESPACE, + Message: "workflow ID start rate limit exceeded", + } ) // StaleStateError is an indicator that after loading the state for a task it was detected as stale. It's possible that diff --git a/service/history/interfaces/shard_context.go b/service/history/interfaces/shard_context.go index 96d959d493..c3d8bb1191 100644 --- a/service/history/interfaces/shard_context.go +++ b/service/history/interfaces/shard_context.go @@ -21,6 +21,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/pingable" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" @@ -106,6 +107,8 @@ type ( GetFinalizer() *finalizer.Finalizer ChasmRegistry() *chasm.Registry + + WorkflowIDReuseRL(namespaceID namespace.ID, workflowID string) quotas.RateLimiter } // A ControllableContext is a Context plus other methods needed by diff --git a/service/history/interfaces/shard_context_mock.go b/service/history/interfaces/shard_context_mock.go index 9e537e6d95..01ec64eacc 100644 --- a/service/history/interfaces/shard_context_mock.go +++ b/service/history/interfaces/shard_context_mock.go @@ -31,6 +31,7 @@ import ( namespace "go.temporal.io/server/common/namespace" persistence0 "go.temporal.io/server/common/persistence" serialization "go.temporal.io/server/common/persistence/serialization" + quotas "go.temporal.io/server/common/quotas" pingable "go.temporal.io/server/common/pingable" searchattribute "go.temporal.io/server/common/searchattribute" configs "go.temporal.io/server/service/history/configs" @@ -724,6 +725,20 @@ func (mr *MockShardContextMockRecorder) StateMachineRegistry() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMachineRegistry", reflect.TypeOf((*MockShardContext)(nil).StateMachineRegistry)) } +// WorkflowIDReuseRL mocks base method. +func (m *MockShardContext) WorkflowIDReuseRL(namespaceID namespace.ID, workflowID string) quotas.RateLimiter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WorkflowIDReuseRL", namespaceID, workflowID) + ret0, _ := ret[0].(quotas.RateLimiter) + return ret0 +} + +// WorkflowIDReuseRL indicates an expected call of WorkflowIDReuseRL. +func (mr *MockShardContextMockRecorder) WorkflowIDReuseRL(namespaceID, workflowID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkflowIDReuseRL", reflect.TypeOf((*MockShardContext)(nil).WorkflowIDReuseRL), namespaceID, workflowID) +} + // UnloadForOwnershipLost mocks base method. func (m *MockShardContext) UnloadForOwnershipLost() { m.ctrl.T.Helper() @@ -1553,6 +1568,20 @@ func (mr *MockControllableContextMockRecorder) UnloadForOwnershipLost() *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnloadForOwnershipLost", reflect.TypeOf((*MockControllableContext)(nil).UnloadForOwnershipLost)) } +// WorkflowIDReuseRL mocks base method. +func (m *MockControllableContext) WorkflowIDReuseRL(namespaceID namespace.ID, workflowID string) quotas.RateLimiter { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WorkflowIDReuseRL", namespaceID, workflowID) + ret0, _ := ret[0].(quotas.RateLimiter) + return ret0 +} + +// WorkflowIDReuseRL indicates an expected call of WorkflowIDReuseRL. +func (mr *MockControllableContextMockRecorder) WorkflowIDReuseRL(namespaceID, workflowID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkflowIDReuseRL", reflect.TypeOf((*MockControllableContext)(nil).WorkflowIDReuseRL), namespaceID, workflowID) +} + // UpdateHandoverNamespace mocks base method. func (m *MockControllableContext) UpdateHandoverNamespace(ns *namespace.Namespace, deletedFromDb bool) { m.ctrl.T.Helper() diff --git a/service/history/interfaces/workflow_context.go b/service/history/interfaces/workflow_context.go index 0378d8469a..99af1a95df 100644 --- a/service/history/interfaces/workflow_context.go +++ b/service/history/interfaces/workflow_context.go @@ -54,6 +54,7 @@ type ( newMutableState MutableState, newWorkflow *persistence.WorkflowSnapshot, newWorkflowEvents []*persistence.WorkflowEvents, + transactionPolicy TransactionPolicy, ) error ConflictResolveWorkflowExecution( ctx context.Context, diff --git a/service/history/interfaces/workflow_context_mock.go b/service/history/interfaces/workflow_context_mock.go index e9d886d71e..f2dc1499e3 100644 --- a/service/history/interfaces/workflow_context_mock.go +++ b/service/history/interfaces/workflow_context_mock.go @@ -72,17 +72,17 @@ func (mr *MockWorkflowContextMockRecorder) ConflictResolveWorkflowExecution(ctx, } // CreateWorkflowExecution mocks base method. -func (m *MockWorkflowContext) CreateWorkflowExecution(ctx context.Context, shardContext ShardContext, createMode persistence0.CreateWorkflowMode, prevRunID string, prevLastWriteVersion int64, newMutableState MutableState, newWorkflow *persistence0.WorkflowSnapshot, newWorkflowEvents []*persistence0.WorkflowEvents) error { +func (m *MockWorkflowContext) CreateWorkflowExecution(ctx context.Context, shardContext ShardContext, createMode persistence0.CreateWorkflowMode, prevRunID string, prevLastWriteVersion int64, newMutableState MutableState, newWorkflow *persistence0.WorkflowSnapshot, newWorkflowEvents []*persistence0.WorkflowEvents, transactionPolicy TransactionPolicy) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateWorkflowExecution", ctx, shardContext, createMode, prevRunID, prevLastWriteVersion, newMutableState, newWorkflow, newWorkflowEvents) + ret := m.ctrl.Call(m, "CreateWorkflowExecution", ctx, shardContext, createMode, prevRunID, prevLastWriteVersion, newMutableState, newWorkflow, newWorkflowEvents, transactionPolicy) ret0, _ := ret[0].(error) return ret0 } // CreateWorkflowExecution indicates an expected call of CreateWorkflowExecution. -func (mr *MockWorkflowContextMockRecorder) CreateWorkflowExecution(ctx, shardContext, createMode, prevRunID, prevLastWriteVersion, newMutableState, newWorkflow, newWorkflowEvents any) *gomock.Call { +func (mr *MockWorkflowContextMockRecorder) CreateWorkflowExecution(ctx, shardContext, createMode, prevRunID, prevLastWriteVersion, newMutableState, newWorkflow, newWorkflowEvents, transactionPolicy any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockWorkflowContext)(nil).CreateWorkflowExecution), ctx, shardContext, createMode, prevRunID, prevLastWriteVersion, newMutableState, newWorkflow, newWorkflowEvents) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowExecution", reflect.TypeOf((*MockWorkflowContext)(nil).CreateWorkflowExecution), ctx, shardContext, createMode, prevRunID, prevLastWriteVersion, newMutableState, newWorkflow, newWorkflowEvents, transactionPolicy) } // GetWorkflowKey mocks base method. diff --git a/service/history/ndc/transaction_manager_new_workflow.go b/service/history/ndc/transaction_manager_new_workflow.go index eb8e91306f..027e2cb678 100644 --- a/service/history/ndc/transaction_manager_new_workflow.go +++ b/service/history/ndc/transaction_manager_new_workflow.go @@ -164,6 +164,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsCurrent( targetWorkflow.GetMutableState(), targetWorkflowSnapshot, targetWorkflowEventsSeq, + historyi.TransactionPolicyPassive, ) } @@ -180,6 +181,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsCurrent( targetWorkflow.GetMutableState(), targetWorkflowSnapshot, targetWorkflowEventsSeq, + historyi.TransactionPolicyPassive, ) } @@ -254,6 +256,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie( ms, targetWorkflowSnapshot, targetWorkflowEventsSeq, + historyi.TransactionPolicyPassive, ) switch err.(type) { case nil: diff --git a/service/history/ndc/transaction_manager_new_workflow_test.go b/service/history/ndc/transaction_manager_new_workflow_test.go index aa1146e8cd..c2a9ed194c 100644 --- a/service/history/ndc/transaction_manager_new_workflow_test.go +++ b/service/history/ndc/transaction_manager_new_workflow_test.go @@ -122,6 +122,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_BrandNew( mutableState, workflowSnapshot, workflowEventsSeq, + gomock.Any(), ).Return(nil) err := s.createMgr.dispatchForNewWorkflow(ctx, chasm.WorkflowArchetypeID, newWorkflow) @@ -195,6 +196,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsC targetMutableState, targetWorkflowSnapshot, targetWorkflowEventsSeq, + gomock.Any(), ).Return(nil) err := s.createMgr.dispatchForNewWorkflow(ctx, chasm.WorkflowArchetypeID, targetWorkflow) @@ -264,6 +266,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ targetMutableState, targetWorkflowSnapshot, targetWorkflowEventsSeq, + gomock.Any(), ).Return(nil) targetContext.EXPECT().ReapplyEvents(gomock.Any(), s.mockShard, targetWorkflowEventsSeq).Return(nil) @@ -343,6 +346,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ targetMutableState, targetWorkflowSnapshot, targetWorkflowEventsSeq, + gomock.Any(), ).Return(nil) targetContext.EXPECT().ReapplyEvents(gomock.Any(), s.mockShard, eventsToApply).Return(nil) @@ -413,6 +417,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ targetMutableState, targetWorkflowSnapshot, targetWorkflowEventsSeq, + gomock.Any(), ).Return(&persistence.WorkflowConditionFailedError{}) targetContext.EXPECT().ReapplyEvents(gomock.Any(), s.mockShard, targetWorkflowEventsSeq).Return(nil) diff --git a/service/history/ndc/workflow_state_replicator_test.go b/service/history/ndc/workflow_state_replicator_test.go index de61cfd990..2f92b98999 100644 --- a/service/history/ndc/workflow_state_replicator_test.go +++ b/service/history/ndc/workflow_state_replicator_test.go @@ -189,6 +189,7 @@ func (s *workflowReplicatorSuite) Test_ApplyWorkflowState_BrandNew() { gomock.Any(), gomock.Any(), []*persistence.WorkflowEvents{}, + gomock.Any(), ).Return(nil) s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(namespace.NewNamespaceForTest( &persistencespb.NamespaceInfo{Name: namespaceName}, @@ -307,6 +308,7 @@ func (s *workflowReplicatorSuite) Test_ApplyWorkflowState_Ancestors() { gomock.Any(), gomock.Any(), []*persistence.WorkflowEvents{}, + gomock.Any(), ).Return(nil) s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespace.ID(namespaceID)).Return(namespace.NewNamespaceForTest( &persistencespb.NamespaceInfo{Name: namespaceName}, diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 13be692653..8d122b9374 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -24,6 +24,8 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/cache" + "go.temporal.io/server/common/quotas" cclock "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/config" @@ -72,6 +74,9 @@ const ( queueMetricUpdateInterval = 5 * time.Minute pendingMaxReplicationTaskID = math.MaxInt64 + + workflowIDRateLimiterCacheSize = 10000 + workflowIDRateLimiterCacheTTL = 60 * time.Second ) var ( @@ -149,6 +154,8 @@ type ( stateMachineRegistry *hsm.Registry chasmRegistry *chasm.Registry + + workflowIDRateLimiters cache.StoppableCache } remoteClusterInfo struct { @@ -2126,6 +2133,7 @@ func newContext( ioSemaphore: locks.NewPrioritySemaphore(ioConcurrency), stateMachineRegistry: stateMachineRegistry, chasmRegistry: chasmRegistry, + workflowIDRateLimiters: cache.New(workflowIDRateLimiterCacheSize, &cache.Options{TTL: workflowIDRateLimiterCacheTTL}), } shardContext.taskKeyManager = newTaskKeyManager( shardContext.taskCategoryRegistry, @@ -2232,6 +2240,29 @@ func (s *ContextImpl) ChasmRegistry() *chasm.Registry { return s.chasmRegistry } +func (s *ContextImpl) WorkflowIDReuseRL(namespaceID namespace.ID, workflowID string) quotas.RateLimiter { + rps := s.config.WorkflowIDReuseRate(namespaceID.String()) + if rps <= 0 { + return nil + } + burst := max(1, int(float64(rps)*s.config.WorkflowIDReuseBurstRatio(namespaceID.String()))) + key := namespaceID.String() + "/" + workflowID + existing := s.workflowIDRateLimiters.Get(key) + if existing == nil { + rl := quotas.NewRateLimiter(float64(rps), burst) + existing, _ = s.workflowIDRateLimiters.PutIfNotExist(key, rl) + } + rl, ok := existing.(*quotas.RateLimiterImpl) + if !ok { + // Should never happen; cache only stores *RateLimiterImpl. + return nil + } + if float64(rps) != rl.Rate() || burst != rl.Burst() { + rl.SetRateBurst(float64(rps), burst) + } + return rl +} + func (s *ContextImpl) GetCachedWorkflowContext( ctx context.Context, namespaceID namespace.ID, diff --git a/service/history/shard/context_testutil.go b/service/history/shard/context_testutil.go index c456258d7f..097b12c5f1 100644 --- a/service/history/shard/context_testutil.go +++ b/service/history/shard/context_testutil.go @@ -8,6 +8,7 @@ import ( "go.temporal.io/server/api/historyservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm" + "go.temporal.io/server/common/cache" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/future" @@ -150,6 +151,7 @@ func newTestContext(t *resourcetest.Test, eventsCache events.Cache, config Conte namespaceRegistry: registry, stateMachineRegistry: hsm.NewRegistry(), chasmRegistry: chasm.NewRegistry(t.GetLogger()), + workflowIDRateLimiters: cache.New(workflowIDRateLimiterCacheSize, &cache.Options{TTL: workflowIDRateLimiterCacheTTL}), persistenceShardManager: t.GetShardManager(), clientBean: t.GetClientBean(), saProvider: t.GetSearchAttributesProvider(), diff --git a/service/history/shard/workflow_id_rate_limiter_test.go b/service/history/shard/workflow_id_rate_limiter_test.go new file mode 100644 index 0000000000..236ed6e533 --- /dev/null +++ b/service/history/shard/workflow_id_rate_limiter_test.go @@ -0,0 +1,115 @@ +package shard + +import ( + "testing" + + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/service/history/configs" + "go.uber.org/mock/gomock" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type workflowIDRateLimiterSuite struct { + suite.Suite + *require.Assertions + controller *gomock.Controller + shard *ContextTest +} + +func TestWorkflowIDRateLimiter(t *testing.T) { + suite.Run(t, new(workflowIDRateLimiterSuite)) +} + +func (s *workflowIDRateLimiterSuite) SetupTest() { + s.Assertions = require.New(s.T()) + s.controller = gomock.NewController(s.T()) + s.shard = NewTestContext( + s.controller, + &persistencespb.ShardInfo{ShardId: 1}, + configs.NewConfig(dynamicconfig.NewNoopCollection(), 1), + ) +} + +func (s *workflowIDRateLimiterSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *workflowIDRateLimiterSuite) TestWorkflowIDReuseRL_Disabled() { + // rps=0 means disabled; WorkflowIDReuseRL should return nil + s.shard.config.WorkflowIDReuseRate = func(_ string) int { return 0 } + nsID := namespace.ID("test-ns-id") + s.Nil(s.shard.WorkflowIDReuseRL(nsID, "wf-id")) +} + +func (s *workflowIDRateLimiterSuite) TestWorkflowIDReuseRL_AllowsUnderLimit() { + // 10 RPS with burst=10: first 10 calls to Allow() should succeed + s.shard.config.WorkflowIDReuseRate = func(_ string) int { return 10 } + nsID := namespace.ID("test-ns-id") + rl := s.shard.WorkflowIDReuseRL(nsID, "wf-id") + s.NotNil(rl) + for i := 0; i < 10; i++ { + s.True(rl.Allow()) + } +} + +func (s *workflowIDRateLimiterSuite) TestWorkflowIDReuseRL_BlocksOverLimit() { + // 1 RPS with burst=1: second Allow() should return false + s.shard.config.WorkflowIDReuseRate = func(_ string) int { return 1 } + nsID := namespace.ID("test-ns-id") + rl := s.shard.WorkflowIDReuseRL(nsID, "wf-id") + s.NotNil(rl) + s.True(rl.Allow()) + s.False(rl.Allow()) +} + +func (s *workflowIDRateLimiterSuite) TestWorkflowIDReuseRL_IndependentWorkflowIDs() { + // Different workflow IDs should have independent rate limiters + s.shard.config.WorkflowIDReuseRate = func(_ string) int { return 1 } + nsID := namespace.ID("test-ns-id") + rl1 := s.shard.WorkflowIDReuseRL(nsID, "wf-id-1") + rl2 := s.shard.WorkflowIDReuseRL(nsID, "wf-id-2") + s.True(rl1.Allow()) + // rl1 is exhausted, but rl2 should still allow + s.True(rl2.Allow()) +} + +func (s *workflowIDRateLimiterSuite) TestWorkflowIDReuseRL_IndependentNamespaces() { + // Same workflow ID in different namespaces should have independent rate limiters + s.shard.config.WorkflowIDReuseRate = func(_ string) int { return 1 } + ns1 := namespace.ID("ns-1") + ns2 := namespace.ID("ns-2") + rl1 := s.shard.WorkflowIDReuseRL(ns1, "wf-id") + rl2 := s.shard.WorkflowIDReuseRL(ns2, "wf-id") + s.True(rl1.Allow()) + // rl1 is exhausted, but rl2 should still allow + s.True(rl2.Allow()) +} + +func (s *workflowIDRateLimiterSuite) TestWorkflowIDReuseRL_BurstRatio() { + // rps=2, ratio=3 → burst=6; first 6 Allow() calls should succeed + s.shard.config.WorkflowIDReuseRate = func(_ string) int { return 2 } + s.shard.config.WorkflowIDReuseBurstRatio = func(_ string) float64 { return 3.0 } + nsID := namespace.ID("test-ns-id") + rl := s.shard.WorkflowIDReuseRL(nsID, "wf-id") + s.NotNil(rl) + for i := 0; i < 6; i++ { + s.True(rl.Allow(), "expected Allow() on call %d", i+1) + } + s.False(rl.Allow(), "expected Allow() to be false after burst exhausted") +} + +func (s *workflowIDRateLimiterSuite) TestWorkflowIDReuseRL_BurstUpdatesOnConfigChange() { + // Start with rps=2, ratio=1 (burst=2), then change ratio to 3 (burst=6). + s.shard.config.WorkflowIDReuseRate = func(_ string) int { return 2 } + s.shard.config.WorkflowIDReuseBurstRatio = func(_ string) float64 { return 1.0 } + nsID := namespace.ID("test-ns-id") + s.shard.WorkflowIDReuseRL(nsID, "wf-id") // populate cache + + s.shard.config.WorkflowIDReuseBurstRatio = func(_ string) float64 { return 3.0 } + rl := s.shard.WorkflowIDReuseRL(nsID, "wf-id") // should update burst to 6 + s.Equal(6, rl.Burst()) +} diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index 9948b31d03..20fbbd8b78 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -243,8 +243,18 @@ func (c *ContextImpl) CreateWorkflowExecution( newMutableState historyi.MutableState, newWorkflow *persistence.WorkflowSnapshot, newWorkflowEvents []*persistence.WorkflowEvents, + transactionPolicy historyi.TransactionPolicy, ) (retError error) { + if transactionPolicy == historyi.TransactionPolicyActive { + if rl := shardContext.WorkflowIDReuseRL( + namespace.ID(c.workflowKey.NamespaceID), + c.workflowKey.WorkflowID, + ); rl != nil && !rl.Allow() { + return consts.ErrWorkflowIDRateLimitExceeded + } + } + defer func() { if retError != nil { c.Clear() @@ -536,6 +546,15 @@ func (c *ContextImpl) UpdateWorkflowExecutionWithNew( }() if newContext != nil && newMutableState != nil && newWorkflowTransactionPolicy != nil { + if *newWorkflowTransactionPolicy == historyi.TransactionPolicyActive { + execInfo := newMutableState.GetExecutionInfo() + if rl := shardContext.WorkflowIDReuseRL( + namespace.ID(execInfo.NamespaceId), + execInfo.WorkflowId, + ); rl != nil && !rl.Allow() { + return consts.ErrWorkflowIDRateLimitExceeded + } + } c.MutableState.SetSuccessorRunID(newMutableState.GetExecutionState().RunId) }