From f1be450fcd0ee4028e80337098762265bfb1b2d9 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Thu, 2 Oct 2025 15:51:52 -0700 Subject: [PATCH 1/2] Pass AllowNoPollers flag to SetCurrentVersion and SetRampingVersion --- internal/internal_worker_deployment_client.go | 2 ++ internal/worker_deployment_client.go | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/internal/internal_worker_deployment_client.go b/internal/internal_worker_deployment_client.go index 30804a819..91314bfa4 100644 --- a/internal/internal_worker_deployment_client.go +++ b/internal/internal_worker_deployment_client.go @@ -213,6 +213,7 @@ func (h *workerDeploymentHandleImpl) SetCurrentVersion(ctx context.Context, opti ConflictToken: options.ConflictToken, Identity: identity, IgnoreMissingTaskQueues: options.IgnoreMissingTaskQueues, + AllowNoPollers: options.AllowNoPollers, } grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) defer cancel() @@ -252,6 +253,7 @@ func (h *workerDeploymentHandleImpl) SetRampingVersion(ctx context.Context, opti ConflictToken: options.ConflictToken, Identity: identity, IgnoreMissingTaskQueues: options.IgnoreMissingTaskQueues, + AllowNoPollers: options.AllowNoPollers, } grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) defer cancel() diff --git a/internal/worker_deployment_client.go b/internal/worker_deployment_client.go index 6964562c2..1d1c589c1 100644 --- a/internal/worker_deployment_client.go +++ b/internal/worker_deployment_client.go @@ -158,6 +158,15 @@ type ( // // Optional: default to reject request when queues are missing. IgnoreMissingTaskQueues bool + + // AllowNoPollers - Override protection against accidentally sending tasks to a version without pollers. + // When false this request will be rejected if no pollers have been seen for the proposed Current Version, + // in order to protect users from routing tasks to pollers that do not exist, leading to possible timeouts. + // Pass `true` here to bypass this protection. + // WARNING: setting this flag could lead to tasks being sent to a version that has no pollers. + // + // Optional: default to reject request when version has never had pollers. + AllowNoPollers bool } // WorkerDeploymentSetCurrentVersionResponse is the response for @@ -216,6 +225,15 @@ type ( // // Optional: default to reject request when queues are missing. IgnoreMissingTaskQueues bool + + // AllowNoPollers - Override protection against accidentally sending tasks to a version without pollers. + // When false this request will be rejected if no pollers have been seen for the proposed Current Version, + // in order to protect users from routing tasks to pollers that do not exist, leading to possible timeouts. + // Pass `true` here to bypass this protection. + // WARNING: setting this flag could lead to tasks being sent to a version that has no pollers. + // + // Optional: default to reject request when version has never had pollers. + AllowNoPollers bool } // WorkerDeploymentSetRampingVersionResponse is the response for From 86ef7fcd6fc587e0dcf97c8f0c1833e0dcce3708 Mon Sep 17 00:00:00 2001 From: Carly de Frondeville Date: Fri, 3 Oct 2025 13:29:21 -0700 Subject: [PATCH 2/2] test allow-no-pollers flag --- test/worker_deployment_test.go | 80 ++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/test/worker_deployment_test.go b/test/worker_deployment_test.go index 7b823e4b2..25a31880c 100644 --- a/test/worker_deployment_test.go +++ b/test/worker_deployment_test.go @@ -993,6 +993,47 @@ func (ts *WorkerDeploymentTestSuite) TestRampVersions() { }, 10*time.Second, 300*time.Millisecond) } +func (ts *WorkerDeploymentTestSuite) TestRampVersion_AllowNoPollers() { + if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" { + ts.T().Skip("temporal server 1.27+ required") + } + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + deploymentName := "deploy-test-" + uuid.NewString() + v1 := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: "1.0", + } + + dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName) + + // Setting Ramp without the AllowNoPollers flag fails when there are no pollers + _, err := dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{ + BuildID: v1.BuildID, + ConflictToken: nil, + Percentage: float32(100.0), + }) + ts.Error(err) + + // Setting Ramp with the AllowNoPollers flag succeeds when there are no pollers + response1, err := dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{ + BuildID: v1.BuildID, + ConflictToken: nil, + Percentage: float32(100.0), + AllowNoPollers: true, + }) + ts.NoError(err) + ts.Nil(response1.PreviousVersion) + + // Verify RoutingConfig is as expected + response2, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{}) + ts.NoError(err) + ts.Equal(v1.BuildID, response2.Info.RoutingConfig.RampingVersion.BuildID) + ts.Equal(float32(100.0), response2.Info.RoutingConfig.RampingVersionPercentage) + ts.Nil(response2.Info.RoutingConfig.CurrentVersion) +} + func (ts *WorkerDeploymentTestSuite) TestSetManagerIdentity() { if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" { ts.T().Skip("temporal server 1.27+ required") @@ -1078,6 +1119,45 @@ func (ts *WorkerDeploymentTestSuite) TestSetManagerIdentity() { ts.Error(err) } +func (ts *WorkerDeploymentTestSuite) TestCurrentVersion_AllowNoPollers() { + if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" { + ts.T().Skip("temporal server 1.27+ required") + } + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + deploymentName := "deploy-test-" + uuid.NewString() + v1 := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: "1.0", + } + + dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName) + + // Setting Current without the AllowNoPollers flag fails when there are no pollers + _, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: v1.BuildID, + ConflictToken: nil, + }) + ts.Error(err) + + // Setting Current with the AllowNoPollers flag succeeds when there are no pollers + response1, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: v1.BuildID, + ConflictToken: nil, + AllowNoPollers: true, + }) + ts.NoError(err) + ts.Nil(response1.PreviousVersion) + + // Verify RoutingConfig is as expected + response2, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{}) + ts.NoError(err) + ts.Equal(v1.BuildID, response2.Info.RoutingConfig.CurrentVersion.BuildID) + ts.Equal(float32(0), response2.Info.RoutingConfig.RampingVersionPercentage) + ts.Nil(response2.Info.RoutingConfig.RampingVersion) +} + func (ts *WorkerDeploymentTestSuite) TestDeleteDeployment() { if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" { ts.T().Skip("temporal server 1.27+ required")