Skip to content

Commit

Permalink
Allow dynamic worker queue to be set (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
vandyliu authored Dec 18, 2024
1 parent c781538 commit 9edf368
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 33 deletions.
10 changes: 5 additions & 5 deletions internal/provider/resources/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ func (r *ClusterResource) Create(

// Wait for the cluster to be created (or fail)
stateConf := &retry.StateChangeConf{
Pending: []string{string(platform.ClusterStatusCREATING), string(platform.ClusterStatusUPDATING)},
Target: []string{string(platform.ClusterStatusCREATED), string(platform.ClusterStatusUPDATEFAILED), string(platform.ClusterStatusCREATEFAILED)},
Pending: []string{string(platform.ClusterStatusCREATING), string(platform.ClusterStatusUPDATING), string(platform.ClusterStatusUPGRADEPENDING)},
Target: []string{string(platform.ClusterStatusCREATED), string(platform.ClusterStatusUPDATEFAILED), string(platform.ClusterStatusCREATEFAILED), string(platform.ClusterStatusACCESSDENIED)},
Refresh: ClusterResourceRefreshFunc(ctx, r.platformClient, r.organizationId, cluster.JSON200.Id),
Timeout: 3 * time.Hour,
MinTimeout: 1 * time.Minute,
Expand Down Expand Up @@ -364,8 +364,8 @@ func (r *ClusterResource) Update(

// Wait for the cluster to be updated (or fail)
stateConf := &retry.StateChangeConf{
Pending: []string{string(platform.ClusterStatusCREATING), string(platform.ClusterStatusUPDATING)},
Target: []string{string(platform.ClusterStatusCREATED), string(platform.ClusterStatusUPDATEFAILED), string(platform.ClusterStatusCREATEFAILED)},
Pending: []string{string(platform.ClusterStatusCREATING), string(platform.ClusterStatusUPDATING), string(platform.ClusterStatusUPGRADEPENDING)},
Target: []string{string(platform.ClusterStatusCREATED), string(platform.ClusterStatusUPDATEFAILED), string(platform.ClusterStatusCREATEFAILED), string(platform.ClusterStatusACCESSDENIED)},
Refresh: ClusterResourceRefreshFunc(ctx, r.platformClient, r.organizationId, cluster.JSON200.Id),
Timeout: 3 * time.Hour,
MinTimeout: 1 * time.Minute,
Expand Down Expand Up @@ -435,7 +435,7 @@ func (r *ClusterResource) Delete(

// Wait for the cluster to be deleted
stateConf := &retry.StateChangeConf{
Pending: []string{string(platform.ClusterStatusCREATING), string(platform.ClusterStatusUPDATING), string(platform.ClusterStatusCREATED), string(platform.ClusterStatusUPDATEFAILED), string(platform.ClusterStatusCREATEFAILED)},
Pending: []string{string(platform.ClusterStatusCREATING), string(platform.ClusterStatusUPDATING), string(platform.ClusterStatusCREATED), string(platform.ClusterStatusUPDATEFAILED), string(platform.ClusterStatusCREATEFAILED), string(platform.ClusterStatusUPGRADEPENDING)},
Target: []string{"DELETED"},
Refresh: ClusterResourceRefreshFunc(ctx, r.platformClient, r.organizationId, data.Id.ValueString()),
Timeout: 1 * time.Hour,
Expand Down
7 changes: 4 additions & 3 deletions internal/provider/resources/resource_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,10 @@ func TestAcc_ResourceClusterAwsWithDedicatedDeployments(t *testing.T) {
},
// Import existing cluster and check it is correctly imported - https://stackoverflow.com/questions/68824711/how-can-i-test-terraform-import-in-acceptance-tests
{
ResourceName: awsResourceVar,
ImportState: true,
ImportStateVerify: true,
ResourceName: awsResourceVar,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"health_status", "health_status.value"},
},
},
})
Expand Down
14 changes: 0 additions & 14 deletions internal/provider/resources/resource_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,20 +645,6 @@ func (r *DeploymentResource) ValidateConfig(
return
}

// Need to do dynamic validation based on the executor and worker queues
if data.Executor.ValueString() == string(platform.DeploymentExecutorKUBERNETES) && len(data.WorkerQueues.Elements()) > 0 {
resp.Diagnostics.AddError(
"worker_queues are not supported for 'KUBERNETES' executor",
"Either change the executor to 'CELERY' or remove worker_queues",
)
}
if data.Executor.ValueString() == string(platform.DeploymentExecutorCELERY) && (data.WorkerQueues.IsNull() || len(data.WorkerQueues.Elements()) == 0) {
resp.Diagnostics.AddError(
"worker_queues must be included for 'CELERY' executor",
"Either change the executor to 'KUBERNETES' or include worker_queues",
)
}

// Type specific validation
switch platform.DeploymentType(data.Type.ValueString()) {
case platform.DeploymentTypeSTANDARD:
Expand Down
67 changes: 56 additions & 11 deletions internal/provider/resources/resource_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestAcc_ResourceDeploymentStandard(t *testing.T) {
Executor: "CELERY",
SchedulerSize: string(platform.SchedulerMachineNameEXTRALARGE),
IncludeEnvironmentVariables: false,
DuplicateWorkerQueues: true,
WorkerQueuesStr: workerQueuesDuplicateStr(""),
}),
ExpectError: regexp.MustCompile(`worker_queue names must be unique`),
},
Expand Down Expand Up @@ -193,6 +193,7 @@ func TestAcc_ResourceDeploymentStandard(t *testing.T) {
Executor: "CELERY",
SchedulerSize: string(platform.SchedulerMachineNameEXTRALARGE),
IncludeEnvironmentVariables: false,
WorkerQueuesStr: workerQueuesStr(""),
}),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(awsResourceVar, "description", utils.TestResourceDescription),
Expand All @@ -204,6 +205,56 @@ func TestAcc_ResourceDeploymentStandard(t *testing.T) {
testAccCheckDeploymentExistence(t, awsDeploymentName, true, true),
),
},
// Change worker queues to depend on a variable
{
Config: `
variable "env" {
type = string
default = "dev"
}
locals {
worker_queue_config = {
dev = [
{
name = "default"
is_default = true
astro_machine = "A5"
max_worker_count = 10
min_worker_count = 0
worker_concurrency = 5
}
]
default = [
{
name = "default"
is_default = false
astro_machine = "A10"
max_worker_count = 3
min_worker_count = 1
worker_concurrency = 10
}
]
}
}
` +
astronomerprovider.ProviderConfig(t, astronomerprovider.HOSTED) + standardDeployment(standardDeploymentInput{
Name: awsDeploymentName,
Description: utils.TestResourceDescription,
Region: "us-east-1",
CloudProvider: "AWS",
Executor: "CELERY",
SchedulerSize: string(platform.SchedulerMachineNameMEDIUM),
IncludeEnvironmentVariables: false,
WorkerQueuesStr: `worker_queues = lookup(local.worker_queue_config, var.env, local.worker_queue_config["default"])`,
}),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(awsResourceVar, "executor", "CELERY"),
resource.TestCheckResourceAttr(awsResourceVar, "worker_queues.0.name", "default"),
// Check via API that deployment exists
testAccCheckDeploymentExistence(t, awsDeploymentName, true, true),
),
},
// Change executor back to KUBERNETES and check it is correctly updated in terraform state
{
Config: astronomerprovider.ProviderConfig(t, astronomerprovider.HOSTED) + standardDeployment(standardDeploymentInput{
Expand Down Expand Up @@ -290,6 +341,7 @@ func TestAcc_ResourceDeploymentStandard(t *testing.T) {
Executor: "CELERY",
SchedulerSize: string(platform.SchedulerMachineNameSMALL),
IncludeEnvironmentVariables: true,
WorkerQueuesStr: workerQueuesStr(""),
}),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(azureCeleryResourceVar, "name", azureCeleryDeploymentName),
Expand Down Expand Up @@ -724,6 +776,7 @@ func developmentDeployment(scalingSpecDeploymentName, scalingSpec string) string
SchedulerSize: string(platform.SchedulerMachineNameSMALL),
IsDevelopmentMode: true,
ScalingSpec: scalingSpec,
WorkerQueuesStr: workerQueuesStr(""),
})
}

Expand All @@ -737,18 +790,10 @@ type standardDeploymentInput struct {
SchedulerSize string
IsDevelopmentMode bool
ScalingSpec string
DuplicateWorkerQueues bool
WorkerQueuesStr string
}

func standardDeployment(input standardDeploymentInput) string {
wqStr := ""
if input.Executor == string(platform.DeploymentExecutorCELERY) {
if input.DuplicateWorkerQueues {
wqStr = workerQueuesDuplicateStr("")
} else {
wqStr = workerQueuesStr("")
}
}
var scalingSpecStr string

if input.IsDevelopmentMode {
Expand Down Expand Up @@ -802,7 +847,7 @@ resource "astro_deployment" "%v" {
}
`,
input.Name, input.Name, utils.TestResourceDescription, input.Name, input.Name, input.Description, input.Region, input.CloudProvider, input.Executor, input.IsDevelopmentMode, input.SchedulerSize, input.Name,
envVarsStr(input.IncludeEnvironmentVariables), wqStr, scalingSpecStr)
envVarsStr(input.IncludeEnvironmentVariables), input.WorkerQueuesStr, scalingSpecStr)
}

func standardDeploymentWithVariableName(input standardDeploymentInput) string {
Expand Down
1 change: 1 addition & 0 deletions internal/provider/resources/resource_team_roles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestAcc_ResourceTeamRoles(t *testing.T) {
IncludeEnvironmentVariables: false,
SchedulerSize: string(platform.DeploymentSchedulerSizeSMALL),
IsDevelopmentMode: false,
WorkerQueuesStr: workerQueuesStr(""),
}) +
teamRoles(string(iam.ORGANIZATIONMEMBER),
fmt.Sprintf(`[{workspace_id = %s
Expand Down

0 comments on commit 9edf368

Please sign in to comment.