Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/internal_worker_deployment_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (h *workerDeploymentHandleImpl) SetCurrentVersion(ctx context.Context, opti
ConflictToken: options.ConflictToken,
Identity: identity,
IgnoreMissingTaskQueues: options.IgnoreMissingTaskQueues,
AllowNoPollers: options.AllowNoPollers,
}
grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()
Expand Down Expand Up @@ -252,6 +253,7 @@ func (h *workerDeploymentHandleImpl) SetRampingVersion(ctx context.Context, opti
ConflictToken: options.ConflictToken,
Identity: identity,
IgnoreMissingTaskQueues: options.IgnoreMissingTaskQueues,
AllowNoPollers: options.AllowNoPollers,
}
grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()
Expand Down
18 changes: 18 additions & 0 deletions internal/worker_deployment_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ type (
//
// Optional: default to reject request when queues are missing.
IgnoreMissingTaskQueues bool

// AllowNoPollers - Override protection against accidentally sending tasks to a version without pollers.
// When false this request will be rejected if no pollers have been seen for the proposed Current Version,
// in order to protect users from routing tasks to pollers that do not exist, leading to possible timeouts.
// Pass `true` here to bypass this protection.
// WARNING: setting this flag could lead to tasks being sent to a version that has no pollers.
//
// Optional: default to reject request when version has never had pollers.
AllowNoPollers bool
}

// WorkerDeploymentSetCurrentVersionResponse is the response for
Expand Down Expand Up @@ -216,6 +225,15 @@ type (
//
// Optional: default to reject request when queues are missing.
IgnoreMissingTaskQueues bool

// AllowNoPollers - Override protection against accidentally sending tasks to a version without pollers.
// When false this request will be rejected if no pollers have been seen for the proposed Current Version,
// in order to protect users from routing tasks to pollers that do not exist, leading to possible timeouts.
// Pass `true` here to bypass this protection.
// WARNING: setting this flag could lead to tasks being sent to a version that has no pollers.
//
// Optional: default to reject request when version has never had pollers.
AllowNoPollers bool
}

// WorkerDeploymentSetRampingVersionResponse is the response for
Expand Down
80 changes: 80 additions & 0 deletions test/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,47 @@ func (ts *WorkerDeploymentTestSuite) TestRampVersions() {
}, 10*time.Second, 300*time.Millisecond)
}

func (ts *WorkerDeploymentTestSuite) TestRampVersion_AllowNoPollers() {
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
ts.T().Skip("temporal server 1.27+ required")
}
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

deploymentName := "deploy-test-" + uuid.NewString()
v1 := worker.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildID: "1.0",
}

dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)

// Setting Ramp without the AllowNoPollers flag fails when there are no pollers
_, err := dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
BuildID: v1.BuildID,
ConflictToken: nil,
Percentage: float32(100.0),
})
ts.Error(err)

// Setting Ramp with the AllowNoPollers flag succeeds when there are no pollers
response1, err := dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
BuildID: v1.BuildID,
ConflictToken: nil,
Percentage: float32(100.0),
AllowNoPollers: true,
})
ts.NoError(err)
ts.Nil(response1.PreviousVersion)

// Verify RoutingConfig is as expected
response2, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
ts.NoError(err)
ts.Equal(v1.BuildID, response2.Info.RoutingConfig.RampingVersion.BuildID)
ts.Equal(float32(100.0), response2.Info.RoutingConfig.RampingVersionPercentage)
ts.Nil(response2.Info.RoutingConfig.CurrentVersion)
}

func (ts *WorkerDeploymentTestSuite) TestSetManagerIdentity() {
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
ts.T().Skip("temporal server 1.27+ required")
Expand Down Expand Up @@ -1078,6 +1119,45 @@ func (ts *WorkerDeploymentTestSuite) TestSetManagerIdentity() {
ts.Error(err)
}

func (ts *WorkerDeploymentTestSuite) TestCurrentVersion_AllowNoPollers() {
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
ts.T().Skip("temporal server 1.27+ required")
}
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

deploymentName := "deploy-test-" + uuid.NewString()
v1 := worker.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildID: "1.0",
}

dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)

// Setting Current without the AllowNoPollers flag fails when there are no pollers
_, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
BuildID: v1.BuildID,
ConflictToken: nil,
})
ts.Error(err)

// Setting Current with the AllowNoPollers flag succeeds when there are no pollers
response1, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
BuildID: v1.BuildID,
ConflictToken: nil,
AllowNoPollers: true,
})
ts.NoError(err)
ts.Nil(response1.PreviousVersion)

// Verify RoutingConfig is as expected
response2, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
ts.NoError(err)
ts.Equal(v1.BuildID, response2.Info.RoutingConfig.CurrentVersion.BuildID)
ts.Equal(float32(0), response2.Info.RoutingConfig.RampingVersionPercentage)
ts.Nil(response2.Info.RoutingConfig.RampingVersion)
}

func (ts *WorkerDeploymentTestSuite) TestDeleteDeployment() {
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
ts.T().Skip("temporal server 1.27+ required")
Expand Down
Loading