From 37c858fd73ea5636d77f1bbb7a5d58b9d0472110 Mon Sep 17 00:00:00 2001 From: Tushar Malik Date: Mon, 6 Oct 2025 18:51:17 -0500 Subject: [PATCH 1/7] APIE-571 - update for default compute pool --- go.mod | 4 +- go.sum | 2 + internal/flink/command.go | 1 + internal/flink/command_shell.go | 50 +++++++++++++++---- internal/flink/command_statement.go | 2 +- internal/flink/command_statement_create.go | 44 ++++++++++------ .../create-without-compute-pool.golden | 7 +++ test/flink_test.go | 1 + test/test-server/flink_gateway_router.go | 6 ++- 9 files changed, 88 insertions(+), 29 deletions(-) create mode 100644 test/fixtures/output/flink/statement/create-without-compute-pool.golden diff --git a/go.mod b/go.mod index dc0e179098..a181e87c29 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/confluentinc/cli/v4 -go 1.24.6 +go 1.24.7 require ( github.com/antihax/optional v1.0.0 @@ -30,7 +30,7 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 - github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0 + github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0 github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0 github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0 github.com/confluentinc/ccloud-sdk-go-v2/identity-provider v0.3.0 diff --git a/go.sum b/go.sum index d6b9f27d60..522bf6ee76 100644 --- a/go.sum +++ b/go.sum @@ -220,6 +220,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 h1:DVWL3Y4b5azgCA github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0/go.mod h1:P4fdIkI1ynjSvhDEGX283KhtzG51eGHQc5Cqtp7bu1Q= github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0 h1:8Y1uXjolI2d5mawcfLn4OfJ81WRMQpjMFWdBm3dLdrk= github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU= +github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0 h1:KzlhRDrUsXbs4ZPZy6T9OWmFIVkZWHxNsDorHHSnwFs= +github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0/go.mod h1:CuvhIQpYj/LbQeYzp7Sw2LJkTKzLh8xlFdQoKq9ZQlY= github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0 h1:37Gjdo+0Ev3g2NPEXyiVm7yTT85AlWbjXYRLvq6Aj9E= github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0/go.mod h1:jnWqax4kM22sutPGMtGmHqe2usgfqYig4UtmHsLENz0= github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0 h1:xD7CXcyAqezFnSVFB4U27oWUY4FlbyciVP0ftDIiI18= diff --git a/internal/flink/command.go b/internal/flink/command.go index 12f507b843..bc665604f3 100644 --- a/internal/flink/command.go +++ b/internal/flink/command.go @@ -57,6 +57,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command { return cmd } +// here func (c *command) addComputePoolFlag(cmd *cobra.Command) { cmd.Flags().String("compute-pool", "", "Flink compute pool ID.") pcmd.RegisterFlagCompletionFunc(cmd, "compute-pool", c.autocompleteComputePools) diff --git a/internal/flink/command_shell.go b/internal/flink/command_shell.go index 7204ae8083..5560f00a28 100644 --- a/internal/flink/command_shell.go +++ b/internal/flink/command_shell.go @@ -42,6 +42,8 @@ func (c *command) newShellCommand(prerunner pcmd.PreRunner, cfg *config.Config) c.addDatabaseFlag(cmd) pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) if featureflags.Manager.BoolVariation("cli.flink.internal", cfg.Context(), config.CliLaunchDarklyClient, true, false) { cmd.Flags().StringSlice("config-key", []string{}, "App option keys for local mode.") @@ -65,7 +67,6 @@ func (c *command) newShellCommand(prerunner pcmd.PreRunner, cfg *config.Config) addCmfFlagSet(cmd) cobra.CheckErr(cmd.MarkFlagRequired("environment")) - cobra.CheckErr(cmd.MarkFlagRequired("compute-pool")) } return cmd @@ -83,9 +84,20 @@ func (c *command) authenticated(authenticated func(*cobra.Command, []string) err return err } - flinkGatewayClient, err := c.GetFlinkGatewayClient(true) - if err != nil { - return err + var flinkGatewayClient *ccloudv2.FlinkGatewayClient + var errClient error + computePool := c.Context.GetCurrentFlinkComputePool() + + if computePool == "" { + flinkGatewayClient, errClient = c.GetFlinkGatewayClient(false) + if errClient != nil { + return errClient + } + } else { + flinkGatewayClient, errClient = c.GetFlinkGatewayClient(true) + if errClient != nil { + return errClient + } } jwtCtx := &config.Context{State: &config.ContextState{AuthToken: flinkGatewayClient.AuthToken}} @@ -168,12 +180,20 @@ func (c *command) startFlinkSqlClient(prerunner pcmd.PreRunner, cmd *cobra.Comma catalog = environment.GetDisplayName() } + cloud, err := cmd.Flags().GetString("cloud") + if err != nil { + return err + } + region, err := cmd.Flags().GetString("region") + if err != nil { + return err + } + computePool := c.Context.GetCurrentFlinkComputePool() if computePool == "" { - return errors.NewErrorWithSuggestions( - "no compute pool selected", - "Select a compute pool with `confluent flink compute-pool use` or `--compute-pool`.", - ) + if cloud == "" || region == "" { + return errors.New("Flink cloud and region flags are required when compute pool is not specified.") + } } serviceAccount, err := cmd.Flags().GetString("service-account") @@ -201,9 +221,17 @@ func (c *command) startFlinkSqlClient(prerunner pcmd.PreRunner, cmd *cobra.Comma return err } - flinkGatewayClient, err := c.GetFlinkGatewayClient(true) - if err != nil { - return err + var flinkGatewayClient *ccloudv2.FlinkGatewayClient + if computePool == "" { + flinkGatewayClient, err = c.GetFlinkGatewayClient(false) + if err != nil { + return err + } + } else { + flinkGatewayClient, err = c.GetFlinkGatewayClient(true) + if err != nil { + return err + } } lspBaseUrl, err := c.getFlinkLanguageServiceUrl(flinkGatewayClient) diff --git a/internal/flink/command_statement.go b/internal/flink/command_statement.go index 3367e38baf..377bac0fa3 100644 --- a/internal/flink/command_statement.go +++ b/internal/flink/command_statement.go @@ -14,7 +14,7 @@ type statementOut struct { CreationDate time.Time `human:"Creation Date" serialized:"creation_date"` Name string `human:"Name" serialized:"name"` Statement string `human:"Statement" serialized:"statement"` - ComputePool string `human:"Compute Pool" serialized:"compute_pool"` + ComputePool string `human:"Compute Pool,omitempty" serialized:"compute_pool,omitempty"` Status string `human:"Status" serialized:"status"` StatusDetail string `human:"Status Detail,omitempty" serialized:"status_detail,omitempty"` LatestOffsets map[string]string `human:"Latest Offsets" serialized:"latest_offsets"` diff --git a/internal/flink/command_statement_create.go b/internal/flink/command_statement_create.go index 940b01e1ea..744421f475 100644 --- a/internal/flink/command_statement_create.go +++ b/internal/flink/command_statement_create.go @@ -2,6 +2,7 @@ package flink import ( "fmt" + "github.com/confluentinc/cli/v4/pkg/ccloudv2" "time" "github.com/spf13/cobra" @@ -45,6 +46,8 @@ func (c *command) newStatementCreateCommand() *cobra.Command { pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddContextFlag(cmd, c.CLICommand) pcmd.AddOutputFlag(cmd) + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) cobra.CheckErr(cmd.MarkFlagRequired("sql")) @@ -63,18 +66,26 @@ func (c *command) statementCreate(cmd *cobra.Command, args []string) error { } computePool := c.Context.GetCurrentFlinkComputePool() + cloud, err := cmd.Flags().GetString("cloud") + if err != nil { + return err + } + + region, err := cmd.Flags().GetString("region") + if err != nil { + return err + } + if computePool == "" { - return errors.NewErrorWithSuggestions( - "no compute pool selected", - "Select a compute pool with `confluent flink compute-pool use` or `--compute-pool`.", - ) + if cloud == "" || region == "" { + return errors.New("Flink cloud and region flags are required when compute pool is not specified.") + } } name := types.GenerateStatementName() if len(args) == 1 { name = args[0] } - sql, err := cmd.Flags().GetString("sql") if err != nil { return err @@ -109,15 +120,22 @@ func (c *command) statementCreate(cmd *cobra.Command, args []string) error { statement := flinkgatewayv1.SqlV1Statement{ Name: flinkgatewayv1.PtrString(name), Spec: &flinkgatewayv1.SqlV1StatementSpec{ - Statement: flinkgatewayv1.PtrString(sql), - Properties: &statementProperties, - ComputePoolId: flinkgatewayv1.PtrString(computePool), + Statement: flinkgatewayv1.PtrString(sql), + Properties: &statementProperties, }, } - - client, err := c.GetFlinkGatewayClient(true) - if err != nil { - return err + var client *ccloudv2.FlinkGatewayClient + if computePool != "" { + statement.Spec.ComputePoolId = flinkgatewayv1.PtrString(computePool) + client, err = c.GetFlinkGatewayClient(true) + if err != nil { + return err + } + } else { + client, err = c.GetFlinkGatewayClient(false) + if err != nil { + return err + } } serviceAccount, err := cmd.Flags().GetString("service-account") @@ -129,12 +147,10 @@ func (c *command) statementCreate(cmd *cobra.Command, args []string) error { if serviceAccount == "" { principal = c.Context.GetUser().GetResourceId() } - statement, err = client.CreateStatement(statement, principal, environmentId, c.Context.LastOrgId) if err != nil { return err } - wait, err := cmd.Flags().GetBool("wait") if err != nil { return err diff --git a/test/fixtures/output/flink/statement/create-without-compute-pool.golden b/test/fixtures/output/flink/statement/create-without-compute-pool.golden new file mode 100644 index 0000000000..554b5cfaaf --- /dev/null +++ b/test/fixtures/output/flink/statement/create-without-compute-pool.golden @@ -0,0 +1,7 @@ +No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026` ++---------------+-------------------------------+ +| Creation Date | 2023-01-01 00:00:00 +0000 UTC | +| Name | my-statement-2 | +| Statement | INSERT * INTO table; | +| Status | PENDING | ++---------------+-------------------------------+ diff --git a/test/flink_test.go b/test/flink_test.go index 34b86fdcb9..bf5dcce22b 100644 --- a/test/flink_test.go +++ b/test/flink_test.go @@ -310,6 +310,7 @@ func (s *CLITestSuite) TestFlinkStatement() { func (s *CLITestSuite) TestFlinkStatementCreate() { tests := []CLITest{ {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456`, fixture: "flink/statement/create.golden"}, + {args: `flink statement create my-statement-2 --sql "INSERT * INTO table;" --cloud aws --region eu-west-1 --service-account sa-123456`, fixture: "flink/statement/create-without-compute-pool.golden"}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456`, fixture: "flink/statement/create-service-account-warning.golden"}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --wait`, fixture: "flink/statement/create-wait.golden"}, {args: `flink statement create --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 -o yaml`, fixture: "flink/statement/create-no-name-yaml.golden", regex: true}, diff --git a/test/test-server/flink_gateway_router.go b/test/test-server/flink_gateway_router.go index da626fe3ae..65ac6b4c3d 100644 --- a/test/test-server/flink_gateway_router.go +++ b/test/test-server/flink_gateway_router.go @@ -174,7 +174,11 @@ func handleSqlEnvironmentsEnvironmentStatements(t *testing.T) http.HandlerFunc { require.NoError(t, err) statement.Metadata = &flinkgatewayv1.StatementObjectMeta{CreatedAt: flinkgatewayv1.PtrTime(time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC))} - statement.Spec.ComputePoolId = flinkgatewayv1.PtrString(validFlinkStatementComputePoolId) + switch strings.ToLower(statement.GetName()) { + case "my-statement": + statement.Spec.ComputePoolId = flinkgatewayv1.PtrString(validFlinkStatementComputePoolId) + } + statement.Status = &flinkgatewayv1.SqlV1StatementStatus{Phase: "PENDING"} err = json.NewEncoder(w).Encode(statement) From 342c3a588f2b6c372d62be22aafd68b2185ae890 Mon Sep 17 00:00:00 2001 From: Tushar Malik Date: Thu, 9 Oct 2025 15:48:02 -0500 Subject: [PATCH 2/7] Add --default-pool --- go.mod | 2 +- go.sum | 6 ++-- internal/flink/command_compute_pool.go | 1 + internal/flink/command_compute_pool_create.go | 8 +++++ .../flink/command_compute_pool_describe.go | 1 + internal/flink/command_compute_pool_list.go | 1 + internal/flink/command_compute_pool_update.go | 12 ++++++- .../flink/compute-pool/create-default.golden | 12 +++++++ .../output/flink/compute-pool/create.golden | 23 ++++++------- .../compute-pool/describe-after-use.golden | 23 ++++++------- .../output/flink/compute-pool/describe.golden | 23 ++++++------- .../flink/compute-pool/list-after-use.golden | 8 ++--- .../flink/compute-pool/list-region.golden | 6 ++-- .../output/flink/compute-pool/list.golden | 8 ++--- .../compute-pool/update-after-use.golden | 23 ++++++------- .../flink/compute-pool/update-default.golden | 12 +++++++ .../output/flink/compute-pool/update.golden | 23 ++++++------- test/flink_test.go | 32 ++++++++++--------- test/test-server/fcpm_handlers.go | 2 ++ 19 files changed, 139 insertions(+), 87 deletions(-) create mode 100644 test/fixtures/output/flink/compute-pool/create-default.golden create mode 100644 test/fixtures/output/flink/compute-pool/update-default.golden diff --git a/go.mod b/go.mod index a181e87c29..2458d44600 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.25.0 github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 - github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 + github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0 github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0 github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0 diff --git a/go.sum b/go.sum index 522bf6ee76..a705de5b2e 100644 --- a/go.sum +++ b/go.sum @@ -214,12 +214,10 @@ github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 h1:ISrVOX9qJ2Sxiu/fGBqqH github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0/go.mod h1:zHG/3DzsnoHC81B1AY9K/8bMX3mxbIp5/nHHdypa//w= github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 h1:o1zKZlKbnN9uv+Y8TxwesBRryUl3lEU6lnfndEJigxQ= github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9/go.mod h1:TtTcSfm+/JvnfqEKglOZ32LIcsRbdtrQdI+TtcP7fiU= -github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 h1:QqtIFEB5E3CIyGMJd7NQBEtc/k3K11PX7f4Fj7sPFdo= -github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0/go.mod h1:GPj4sfR85OyiFQUMNEq1DtPOjYVAuE222Z6Mcapwa48= +github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0 h1:gRRtad0RRit38+54vKg6DtUlTjPjsuKiVSs1fvyP0nk= +github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0/go.mod h1:JHg9yHyCBLL0Zm24skG4pGaSR49IfxPJaQg/HXzMJpw= github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 h1:DVWL3Y4b5azgCADubtyp3EhGZuyJkleINaTy2V3iius= github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0/go.mod h1:P4fdIkI1ynjSvhDEGX283KhtzG51eGHQc5Cqtp7bu1Q= -github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0 h1:8Y1uXjolI2d5mawcfLn4OfJ81WRMQpjMFWdBm3dLdrk= -github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU= github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0 h1:KzlhRDrUsXbs4ZPZy6T9OWmFIVkZWHxNsDorHHSnwFs= github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0/go.mod h1:CuvhIQpYj/LbQeYzp7Sw2LJkTKzLh8xlFdQoKq9ZQlY= github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0 h1:37Gjdo+0Ev3g2NPEXyiVm7yTT85AlWbjXYRLvq6Aj9E= diff --git a/internal/flink/command_compute_pool.go b/internal/flink/command_compute_pool.go index 78dc1cfb28..339e571485 100644 --- a/internal/flink/command_compute_pool.go +++ b/internal/flink/command_compute_pool.go @@ -15,6 +15,7 @@ type computePoolOut struct { Environment string `human:"Environment" serialized:"environment"` CurrentCfu int32 `human:"Current CFU" serialized:"currrent_cfu"` MaxCfu int32 `human:"Max CFU" serialized:"max_cfu"` + DefaultPool bool `human:"Default Pool" serialized:"default_pool"` Cloud string `human:"Cloud" serialized:"cloud"` Region string `human:"Region" serialized:"region"` Status string `human:"Status" serialized:"status"` diff --git a/internal/flink/command_compute_pool_create.go b/internal/flink/command_compute_pool_create.go index a2621768b5..1f7f8c91e0 100644 --- a/internal/flink/command_compute_pool_create.go +++ b/internal/flink/command_compute_pool_create.go @@ -28,6 +28,7 @@ func (c *command) newComputePoolCreateCommand() *cobra.Command { pcmd.AddCloudFlag(cmd) pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) cmd.Flags().Int32("max-cfu", 5, "Maximum number of Confluent Flink Units (CFU).") + cmd.Flags().Bool("default-pool", false, "Indicate whether the Flink compute pool is a default compute pool or not.") pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddOutputFlag(cmd) @@ -53,6 +54,11 @@ func (c *command) computePoolCreate(cmd *cobra.Command, args []string) error { return err } + isDefault, err := cmd.Flags().GetBool("default-pool") + if err != nil { + return err + } + environmentId, err := c.Context.EnvironmentId() if err != nil { return err @@ -68,6 +74,7 @@ func (c *command) computePoolCreate(cmd *cobra.Command, args []string) error { Cloud: flinkv2.PtrString(cloud), Region: flinkv2.PtrString(region), MaxCfu: flinkv2.PtrInt32(maxCfu), + DefaultPool: flinkv2.PtrBool(isDefault), Environment: &flinkv2.GlobalObjectReference{ Id: environmentId, Related: environment.Metadata.GetSelf(), @@ -88,6 +95,7 @@ func (c *command) computePoolCreate(cmd *cobra.Command, args []string) error { Environment: computePool.Spec.Environment.GetId(), CurrentCfu: computePool.Status.GetCurrentCfu(), MaxCfu: computePool.Spec.GetMaxCfu(), + DefaultPool: computePool.Spec.GetDefaultPool(), Cloud: computePool.Spec.GetCloud(), Region: computePool.Spec.GetRegion(), Status: computePool.Status.GetPhase(), diff --git a/internal/flink/command_compute_pool_describe.go b/internal/flink/command_compute_pool_describe.go index 7bfe1b02b1..c5c580ad22 100644 --- a/internal/flink/command_compute_pool_describe.go +++ b/internal/flink/command_compute_pool_describe.go @@ -54,6 +54,7 @@ func (c *command) computePoolDescribe(cmd *cobra.Command, args []string) error { Environment: computePool.Spec.Environment.GetId(), CurrentCfu: computePool.Status.GetCurrentCfu(), MaxCfu: computePool.Spec.GetMaxCfu(), + DefaultPool: computePool.Spec.GetDefaultPool(), Cloud: computePool.Spec.GetCloud(), Region: computePool.Spec.GetRegion(), Status: computePool.Status.GetPhase(), diff --git a/internal/flink/command_compute_pool_list.go b/internal/flink/command_compute_pool_list.go index 87970fcfa5..6977a1007d 100644 --- a/internal/flink/command_compute_pool_list.go +++ b/internal/flink/command_compute_pool_list.go @@ -48,6 +48,7 @@ func (c *command) computePoolList(cmd *cobra.Command, _ []string) error { Environment: computePool.Spec.Environment.GetId(), CurrentCfu: computePool.Status.GetCurrentCfu(), MaxCfu: computePool.Spec.GetMaxCfu(), + DefaultPool: computePool.Spec.GetDefaultPool(), Cloud: computePool.Spec.GetCloud(), Region: computePool.Spec.GetRegion(), Status: computePool.Status.GetPhase(), diff --git a/internal/flink/command_compute_pool_update.go b/internal/flink/command_compute_pool_update.go index c4775afeb7..10492ed6af 100644 --- a/internal/flink/command_compute_pool_update.go +++ b/internal/flink/command_compute_pool_update.go @@ -29,10 +29,11 @@ func (c *command) newComputePoolUpdateCommand() *cobra.Command { cmd.Flags().String("name", "", "Name of the compute pool.") cmd.Flags().Int32("max-cfu", 0, "Maximum number of Confluent Flink Units (CFU).") + cmd.Flags().Bool("default-pool", false, "Indicate whether the Flink compute pool is a default compute pool or not.") pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddOutputFlag(cmd) - cmd.MarkFlagsOneRequired("name", "max-cfu") + cmd.MarkFlagsOneRequired("name", "max-cfu", "default-pool") return cmd } @@ -92,6 +93,14 @@ func (c *command) computePoolUpdate(cmd *cobra.Command, args []string) error { update.Spec.DisplayName = flinkv2.PtrString(name) } + if cmd.Flags().Changed("default-pool") { + defaultPool, err := cmd.Flags().GetBool("default-pool") + if err != nil { + return err + } + update.Spec.DefaultPool = flinkv2.PtrBool(defaultPool) + } + updatedComputePool, err := c.V2Client.UpdateFlinkComputePool(id, update) if err != nil { return err @@ -105,6 +114,7 @@ func (c *command) computePoolUpdate(cmd *cobra.Command, args []string) error { Environment: updatedComputePool.Spec.Environment.GetId(), CurrentCfu: computePool.Status.GetCurrentCfu(), MaxCfu: updatedComputePool.Spec.GetMaxCfu(), + DefaultPool: updatedComputePool.Spec.GetDefaultPool(), Cloud: computePool.Spec.GetCloud(), Region: computePool.Spec.GetRegion(), Status: computePool.Status.GetPhase(), diff --git a/test/fixtures/output/flink/compute-pool/create-default.golden b/test/fixtures/output/flink/compute-pool/create-default.golden new file mode 100644 index 0000000000..33a6ff595b --- /dev/null +++ b/test/fixtures/output/flink/compute-pool/create-default.golden @@ -0,0 +1,12 @@ ++--------------+-------------------+ +| Current | false | +| ID | lfcp-123456 | +| Name | my-compute-pool-2 | +| Environment | env-596 | +| Current CFU | 0 | +| Max CFU | 5 | +| Default Pool | true | +| Cloud | AWS | +| Region | us-west-2 | +| Status | PROVISIONING | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/compute-pool/create.golden b/test/fixtures/output/flink/compute-pool/create.golden index 937e8b4d3d..b77c448b29 100644 --- a/test/fixtures/output/flink/compute-pool/create.golden +++ b/test/fixtures/output/flink/compute-pool/create.golden @@ -1,11 +1,12 @@ -+-------------+-----------------+ -| Current | false | -| ID | lfcp-123456 | -| Name | my-compute-pool | -| Environment | env-596 | -| Current CFU | 0 | -| Max CFU | 5 | -| Cloud | AWS | -| Region | us-west-2 | -| Status | PROVISIONING | -+-------------+-----------------+ ++--------------+-----------------+ +| Current | false | +| ID | lfcp-123456 | +| Name | my-compute-pool | +| Environment | env-596 | +| Current CFU | 0 | +| Max CFU | 5 | +| Default Pool | false | +| Cloud | AWS | +| Region | us-west-2 | +| Status | PROVISIONING | ++--------------+-----------------+ diff --git a/test/fixtures/output/flink/compute-pool/describe-after-use.golden b/test/fixtures/output/flink/compute-pool/describe-after-use.golden index 98fdf32db9..d4f3454dde 100644 --- a/test/fixtures/output/flink/compute-pool/describe-after-use.golden +++ b/test/fixtures/output/flink/compute-pool/describe-after-use.golden @@ -1,11 +1,12 @@ -+-------------+-------------------+ -| Current | true | -| ID | lfcp-123456 | -| Name | my-compute-pool-1 | -| Environment | env-123 | -| Current CFU | 0 | -| Max CFU | 1 | -| Cloud | AWS | -| Region | eu-west-1 | -| Status | PROVISIONED | -+-------------+-------------------+ ++--------------+-------------------+ +| Current | true | +| ID | lfcp-123456 | +| Name | my-compute-pool-1 | +| Environment | env-123 | +| Current CFU | 0 | +| Max CFU | 1 | +| Default Pool | false | +| Cloud | AWS | +| Region | eu-west-1 | +| Status | PROVISIONED | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/compute-pool/describe.golden b/test/fixtures/output/flink/compute-pool/describe.golden index 316a0384dc..9c4c61c295 100644 --- a/test/fixtures/output/flink/compute-pool/describe.golden +++ b/test/fixtures/output/flink/compute-pool/describe.golden @@ -1,11 +1,12 @@ -+-------------+-------------------+ -| Current | false | -| ID | lfcp-123456 | -| Name | my-compute-pool-1 | -| Environment | env-123 | -| Current CFU | 0 | -| Max CFU | 1 | -| Cloud | AWS | -| Region | eu-west-1 | -| Status | PROVISIONED | -+-------------+-------------------+ ++--------------+-------------------+ +| Current | false | +| ID | lfcp-123456 | +| Name | my-compute-pool-1 | +| Environment | env-123 | +| Current CFU | 0 | +| Max CFU | 1 | +| Default Pool | false | +| Cloud | AWS | +| Region | eu-west-1 | +| Status | PROVISIONED | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/compute-pool/list-after-use.golden b/test/fixtures/output/flink/compute-pool/list-after-use.golden index fb9d0f1fb0..720b104ddc 100644 --- a/test/fixtures/output/flink/compute-pool/list-after-use.golden +++ b/test/fixtures/output/flink/compute-pool/list-after-use.golden @@ -1,4 +1,4 @@ - Current | ID | Name | Environment | Current CFU | Max CFU | Cloud | Region | Status -----------+-------------+-------------------+-------------+-------------+---------+-------+-----------+-------------- - * | lfcp-123456 | my-compute-pool-1 | env-123 | 0 | 1 | AWS | eu-west-1 | PROVISIONED - | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | AWS | eu-west-2 | PROVISIONED + Current | ID | Name | Environment | Current CFU | Max CFU | Default Pool | Cloud | Region | Status +----------+-------------+-------------------+-------------+-------------+---------+--------------+-------+-----------+-------------- + * | lfcp-123456 | my-compute-pool-1 | env-123 | 0 | 1 | false | AWS | eu-west-1 | PROVISIONED + | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | false | AWS | eu-west-2 | PROVISIONED diff --git a/test/fixtures/output/flink/compute-pool/list-region.golden b/test/fixtures/output/flink/compute-pool/list-region.golden index 4b81f1fde7..6554c733ff 100644 --- a/test/fixtures/output/flink/compute-pool/list-region.golden +++ b/test/fixtures/output/flink/compute-pool/list-region.golden @@ -1,3 +1,3 @@ - Current | ID | Name | Environment | Current CFU | Max CFU | Cloud | Region | Status -----------+-------------+-------------------+-------------+-------------+---------+-------+-----------+-------------- - | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | AWS | eu-west-2 | PROVISIONED + Current | ID | Name | Environment | Current CFU | Max CFU | Default Pool | Cloud | Region | Status +----------+-------------+-------------------+-------------+-------------+---------+--------------+-------+-----------+-------------- + | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | false | AWS | eu-west-2 | PROVISIONED diff --git a/test/fixtures/output/flink/compute-pool/list.golden b/test/fixtures/output/flink/compute-pool/list.golden index 174bdb42cd..abfae6cf8b 100644 --- a/test/fixtures/output/flink/compute-pool/list.golden +++ b/test/fixtures/output/flink/compute-pool/list.golden @@ -1,4 +1,4 @@ - Current | ID | Name | Environment | Current CFU | Max CFU | Cloud | Region | Status -----------+-------------+-------------------+-------------+-------------+---------+-------+-----------+-------------- - | lfcp-123456 | my-compute-pool-1 | env-123 | 0 | 1 | AWS | eu-west-1 | PROVISIONED - | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | AWS | eu-west-2 | PROVISIONED + Current | ID | Name | Environment | Current CFU | Max CFU | Default Pool | Cloud | Region | Status +----------+-------------+-------------------+-------------+-------------+---------+--------------+-------+-----------+-------------- + | lfcp-123456 | my-compute-pool-1 | env-123 | 0 | 1 | false | AWS | eu-west-1 | PROVISIONED + | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | false | AWS | eu-west-2 | PROVISIONED diff --git a/test/fixtures/output/flink/compute-pool/update-after-use.golden b/test/fixtures/output/flink/compute-pool/update-after-use.golden index 3a00bd72d6..391dd56282 100644 --- a/test/fixtures/output/flink/compute-pool/update-after-use.golden +++ b/test/fixtures/output/flink/compute-pool/update-after-use.golden @@ -1,11 +1,12 @@ -+-------------+-------------------+ -| Current | true | -| ID | lfcp-123456 | -| Name | my-compute-pool-1 | -| Environment | env-123 | -| Current CFU | 0 | -| Max CFU | 5 | -| Cloud | AWS | -| Region | eu-west-1 | -| Status | PROVISIONED | -+-------------+-------------------+ ++--------------+-------------------+ +| Current | true | +| ID | lfcp-123456 | +| Name | my-compute-pool-1 | +| Environment | env-123 | +| Current CFU | 0 | +| Max CFU | 5 | +| Default Pool | false | +| Cloud | AWS | +| Region | eu-west-1 | +| Status | PROVISIONED | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/compute-pool/update-default.golden b/test/fixtures/output/flink/compute-pool/update-default.golden new file mode 100644 index 0000000000..9c4c61c295 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool/update-default.golden @@ -0,0 +1,12 @@ ++--------------+-------------------+ +| Current | false | +| ID | lfcp-123456 | +| Name | my-compute-pool-1 | +| Environment | env-123 | +| Current CFU | 0 | +| Max CFU | 1 | +| Default Pool | false | +| Cloud | AWS | +| Region | eu-west-1 | +| Status | PROVISIONED | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/compute-pool/update.golden b/test/fixtures/output/flink/compute-pool/update.golden index ab59f5cb51..a7e403d106 100644 --- a/test/fixtures/output/flink/compute-pool/update.golden +++ b/test/fixtures/output/flink/compute-pool/update.golden @@ -1,11 +1,12 @@ -+-------------+-------------------+ -| Current | false | -| ID | lfcp-123456 | -| Name | my-compute-pool-1 | -| Environment | env-123 | -| Current CFU | 0 | -| Max CFU | 5 | -| Cloud | AWS | -| Region | eu-west-1 | -| Status | PROVISIONED | -+-------------+-------------------+ ++--------------+-------------------+ +| Current | false | +| ID | lfcp-123456 | +| Name | my-compute-pool-1 | +| Environment | env-123 | +| Current CFU | 0 | +| Max CFU | 5 | +| Default Pool | false | +| Cloud | AWS | +| Region | eu-west-1 | +| Status | PROVISIONED | ++--------------+-------------------+ diff --git a/test/flink_test.go b/test/flink_test.go index bf5dcce22b..ff259b1e91 100644 --- a/test/flink_test.go +++ b/test/flink_test.go @@ -61,21 +61,6 @@ func (s *CLITestSuite) TestFlinkArtifact() { } } -func (s *CLITestSuite) TestFlinkComputePool() { - tests := []CLITest{ - {args: "flink compute-pool create my-compute-pool --cloud aws --region us-west-2", fixture: "flink/compute-pool/create.golden"}, - {args: "flink compute-pool describe lfcp-123456", fixture: "flink/compute-pool/describe.golden"}, - {args: "flink compute-pool list", fixture: "flink/compute-pool/list.golden"}, - {args: "flink compute-pool list --region eu-west-2", fixture: "flink/compute-pool/list-region.golden"}, - {args: "flink compute-pool update lfcp-123456 --max-cfu 5", fixture: "flink/compute-pool/update.golden"}, - } - - for _, test := range tests { - test.login = "cloud" - s.runIntegrationTest(test) - } -} - func (s *CLITestSuite) TestFlinkConnection() { tests := []CLITest{ {args: "flink region use --cloud aws --region eu-west-1", fixture: "flink/region/use-aws.golden"}, @@ -209,6 +194,23 @@ func (s *CLITestSuite) TestFlinkConnectivityType() { } } +func (s *CLITestSuite) TestFlinkComputePool() { + tests := []CLITest{ + {args: "flink compute-pool create my-compute-pool --cloud aws --region us-west-2", fixture: "flink/compute-pool/create.golden"}, + {args: "flink compute-pool describe lfcp-123456", fixture: "flink/compute-pool/describe.golden"}, + {args: "flink compute-pool list", fixture: "flink/compute-pool/list.golden"}, + {args: "flink compute-pool list --region eu-west-2", fixture: "flink/compute-pool/list-region.golden"}, + {args: "flink compute-pool update lfcp-123456 --max-cfu 5", fixture: "flink/compute-pool/update.golden"}, + {args: "flink compute-pool create my-compute-pool-2 --default-pool --cloud aws --region us-west-2", fixture: "flink/compute-pool/create-default.golden"}, + {args: "flink compute-pool update lfcp-123456 --default-pool=false", fixture: "flink/compute-pool/update-default.golden"}, + } + + for _, test := range tests { + test.login = "cloud" + s.runIntegrationTest(test) + } +} + func (s *CLITestSuite) TestFlinkComputePoolDelete() { tests := []CLITest{ {args: "flink compute-pool delete lfcp-123456 --force", fixture: "flink/compute-pool/delete.golden"}, diff --git a/test/test-server/fcpm_handlers.go b/test/test-server/fcpm_handlers.go index 20cee51a14..34d6e34d3b 100644 --- a/test/test-server/fcpm_handlers.go +++ b/test/test-server/fcpm_handlers.go @@ -56,6 +56,7 @@ func handleFcpmComputePools(t *testing.T) http.HandlerFunc { err := json.NewDecoder(r.Body).Decode(create) require.NoError(t, err) create.Spec.Cloud = flinkv2.PtrString(strings.ToUpper(create.Spec.GetCloud())) + create.Spec.DefaultPool = flinkv2.PtrBool(create.Spec.GetDefaultPool()) v := flinkv2.FcpmV2ComputePool{ Id: flinkv2.PtrString("lfcp-123456"), @@ -105,6 +106,7 @@ func handleFcpmComputePoolsId(t *testing.T) http.HandlerFunc { Spec: &flinkv2.FcpmV2ComputePoolSpec{ DisplayName: flinkv2.PtrString("my-compute-pool-1"), MaxCfu: flinkv2.PtrInt32(update.Spec.GetMaxCfu()), + DefaultPool: flinkv2.PtrBool(update.Spec.GetDefaultPool()), Cloud: flinkv2.PtrString("AWS"), Region: flinkv2.PtrString("eu-west-1"), Environment: &flinkv2.GlobalObjectReference{Id: "env-123"}, From 86fe4b6165c4fa7613ae0cf1600079bde54acfae Mon Sep 17 00:00:00 2001 From: Tushar Malik Date: Fri, 10 Oct 2025 16:17:21 -0500 Subject: [PATCH 3/7] Add compute pool config --- internal/flink/command.go | 1 + internal/flink/command_compute_pool_config.go | 22 ++++++ .../command_compute_pool_config_describe.go | 34 ++++++++ .../command_compute_pool_config_update.go | 78 +++++++++++++++++++ pkg/ccloudv2/flink.go | 10 +++ .../describe-compute-pool-config.golden | 4 + .../update-compute-pool-config.golden | 4 + test/flink_test.go | 12 +++ test/test-server/ccloudv2_router.go | 1 + test/test-server/fcpm_handlers.go | 28 +++++++ 10 files changed, 194 insertions(+) create mode 100644 internal/flink/command_compute_pool_config.go create mode 100644 internal/flink/command_compute_pool_config_describe.go create mode 100644 internal/flink/command_compute_pool_config_update.go create mode 100644 test/fixtures/output/flink/compute-pool/describe-compute-pool-config.golden create mode 100644 test/fixtures/output/flink/compute-pool/update-compute-pool-config.golden diff --git a/internal/flink/command.go b/internal/flink/command.go index bc665604f3..0f17a718bb 100644 --- a/internal/flink/command.go +++ b/internal/flink/command.go @@ -49,6 +49,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command { // Cloud Specific Commands cmd.AddCommand(c.newArtifactCommand()) + cmd.AddCommand(c.newComputePoolConfigCommand()) cmd.AddCommand(c.newConnectionCommand()) cmd.AddCommand(c.newConnectivityTypeCommand()) cmd.AddCommand(c.newEndpointCommand()) diff --git a/internal/flink/command_compute_pool_config.go b/internal/flink/command_compute_pool_config.go new file mode 100644 index 0000000000..f73ca4a11e --- /dev/null +++ b/internal/flink/command_compute_pool_config.go @@ -0,0 +1,22 @@ +package flink + +import ( + "github.com/spf13/cobra" +) + +type computePoolConfigOut struct { + DefaultPoolEnabled bool `human:"Default Pool Enabled" serialized:"default_pool_enabled"` + DefaultPoolMaxCFU int32 `human:"Default Pool Max CFU" serialized:"default_pool_max_cfu"` +} + +func (c *command) newComputePoolConfigCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "compute-pool-config", + Short: "Manage Flink compute pools configs.", + } + + cmd.AddCommand(c.newComputePoolConfigDescribeCommand()) + cmd.AddCommand(c.newComputePoolConfigUpdateCommand()) + + return cmd +} diff --git a/internal/flink/command_compute_pool_config_describe.go b/internal/flink/command_compute_pool_config_describe.go new file mode 100644 index 0000000000..41f0d92e9e --- /dev/null +++ b/internal/flink/command_compute_pool_config_describe.go @@ -0,0 +1,34 @@ +package flink + +import ( + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *command) newComputePoolConfigDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe", + Short: "Describe a Flink compute pool config.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + RunE: c.computePoolConfigDescribe, + } + + pcmd.AddOutputFlag(cmd) + return cmd +} + +func (c *command) computePoolConfigDescribe(cmd *cobra.Command, args []string) error { + + computePoolConfig, err := c.V2Client.DescribeFlinkComputePoolConfig() + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&computePoolConfigOut{ + DefaultPoolEnabled: computePoolConfig.Spec.GetDefaultPoolEnabled(), + DefaultPoolMaxCFU: computePoolConfig.Spec.GetDefaultPoolMaxCfu(), + }) + return table.Print() +} diff --git a/internal/flink/command_compute_pool_config_update.go b/internal/flink/command_compute_pool_config_update.go new file mode 100644 index 0000000000..644f06b33f --- /dev/null +++ b/internal/flink/command_compute_pool_config_update.go @@ -0,0 +1,78 @@ +package flink + +import ( + flinkv2 "github.com/confluentinc/ccloud-sdk-go-v2/flink/v2" + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/examples" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *command) newComputePoolConfigUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update", + Short: "Update a Flink compute pool config.", + Args: cobra.MaximumNArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validComputePoolArgs), + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + RunE: c.computePoolConfigUpdate, + Example: examples.BuildExampleString( + examples.Example{ + Text: `Update CFU count of a Flink compute pool config.`, + Code: `confluent flink compute-pool update --max-cfu 5`, + }, + ), + } + + cmd.Flags().Int32("max-cfu", -1, "Maximum number of Confluent Flink Units (CFU).") + cmd.Flags().Bool("default-pool", false, "Maximum number of Confluent Flink Units (CFUs) that default compute pools in this organization should auto-scale to.") + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cmd.MarkFlagsOneRequired("max-cfu", "default-pool") + + return cmd +} + +func (c *command) computePoolConfigUpdate(cmd *cobra.Command, args []string) error { + + computePoolConfig, err := c.V2Client.DescribeFlinkComputePoolConfig() + if err != nil { + return err + } + + update := flinkv2.FcpmV2OrgComputePoolConfigUpdate{ + Spec: &flinkv2.FcpmV2OrgComputePoolConfigSpec{ + DefaultPoolEnabled: flinkv2.PtrBool(computePoolConfig.Spec.GetDefaultPoolEnabled()), + DefaultPoolMaxCfu: flinkv2.PtrInt32(computePoolConfig.Spec.GetDefaultPoolMaxCfu()), + }, + } + + if cmd.Flags().Changed("default-pool") { + defaultPool, err := cmd.Flags().GetBool("default-pool") + if err != nil { + return err + } + update.Spec.DefaultPoolEnabled = flinkv2.PtrBool(defaultPool) + } + + maxCfu, err := cmd.Flags().GetInt32("max-cfu") + if err != nil { + return err + } + if maxCfu != -1 { + update.Spec.DefaultPoolMaxCfu = flinkv2.PtrInt32(maxCfu) + } + + updatedComputePoolConfig, err := c.V2Client.UpdateFlinkComputePoolConfig(update) + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&computePoolConfigOut{ + DefaultPoolEnabled: updatedComputePoolConfig.Spec.GetDefaultPoolEnabled(), + DefaultPoolMaxCFU: updatedComputePoolConfig.Spec.GetDefaultPoolMaxCfu(), + }) + return table.Print() +} diff --git a/pkg/ccloudv2/flink.go b/pkg/ccloudv2/flink.go index aa7fd709cb..2d96a0f45b 100644 --- a/pkg/ccloudv2/flink.go +++ b/pkg/ccloudv2/flink.go @@ -38,6 +38,11 @@ func (c *Client) DescribeFlinkComputePool(id, environment string) (flinkv2.FcpmV return res, errors.CatchComputePoolNotFoundError(err, id, httpResp) } +func (c *Client) DescribeFlinkComputePoolConfig() (flinkv2.FcpmV2OrgComputePoolConfig, error) { + res, httpResp, err := c.FlinkClient.OrgComputePoolConfigsFcpmV2Api.GetFcpmV2OrgComputePoolConfig(c.flinkApiContext()).Execute() + return res, errors.CatchCCloudV2Error(err, httpResp) +} + func (c *Client) ListFlinkComputePools(environment, specRegion string) ([]flinkv2.FcpmV2ComputePool, error) { var list []flinkv2.FcpmV2ComputePool @@ -109,3 +114,8 @@ func (c *Client) UpdateFlinkComputePool(id string, update flinkv2.FcpmV2ComputeP res, httpResp, err := c.FlinkClient.ComputePoolsFcpmV2Api.UpdateFcpmV2ComputePool(c.flinkApiContext(), id).FcpmV2ComputePoolUpdate(update).Execute() return res, errors.CatchCCloudV2Error(err, httpResp) } + +func (c *Client) UpdateFlinkComputePoolConfig(fcpmV2OrgComputePoolConfigUpdate flinkv2.FcpmV2OrgComputePoolConfigUpdate) (flinkv2.FcpmV2OrgComputePoolConfig, error) { + res, httpResp, err := c.FlinkClient.OrgComputePoolConfigsFcpmV2Api.UpdateFcpmV2OrgComputePoolConfig(c.flinkApiContext()).FcpmV2OrgComputePoolConfigUpdate(fcpmV2OrgComputePoolConfigUpdate).Execute() + return res, errors.CatchCCloudV2Error(err, httpResp) +} diff --git a/test/fixtures/output/flink/compute-pool/describe-compute-pool-config.golden b/test/fixtures/output/flink/compute-pool/describe-compute-pool-config.golden new file mode 100644 index 0000000000..a113099c87 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool/describe-compute-pool-config.golden @@ -0,0 +1,4 @@ ++----------------------+-------+ +| Default Pool Enabled | false | +| Default Pool Max CFU | 50 | ++----------------------+-------+ diff --git a/test/fixtures/output/flink/compute-pool/update-compute-pool-config.golden b/test/fixtures/output/flink/compute-pool/update-compute-pool-config.golden new file mode 100644 index 0000000000..d1e139c180 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool/update-compute-pool-config.golden @@ -0,0 +1,4 @@ ++----------------------+------+ +| Default Pool Enabled | true | +| Default Pool Max CFU | 5 | ++----------------------+------+ diff --git a/test/flink_test.go b/test/flink_test.go index ff259b1e91..6f872a8043 100644 --- a/test/flink_test.go +++ b/test/flink_test.go @@ -253,6 +253,18 @@ func (s *CLITestSuite) TestFlinkComputePoolUse() { } } +func (s *CLITestSuite) TestFlinkComputePoolConfig() { + tests := []CLITest{ + {args: "flink compute-pool-config describe", fixture: "flink/compute-pool/describe-compute-pool-config.golden"}, + {args: "flink compute-pool-config update --max-cfu 5 --default-pool", fixture: "flink/compute-pool/update-compute-pool-config.golden"}, + } + + for _, test := range tests { + test.login = "cloud" + s.runIntegrationTest(test) + } +} + func (s *CLITestSuite) TestFlinkRegion() { tests := []CLITest{ {args: "flink region use --cloud aws --region eu-west-1", fixture: "flink/region/use-aws.golden"}, diff --git a/test/test-server/ccloudv2_router.go b/test/test-server/ccloudv2_router.go index ad8acd47d6..65e5146315 100644 --- a/test/test-server/ccloudv2_router.go +++ b/test/test-server/ccloudv2_router.go @@ -56,6 +56,7 @@ var ccloudV2Routes = []route{ {"/connect/v1/custom-connector-runtimes", handleListCustomConnectorRuntimes}, {"/connect/v1/dummy-presigned-url", handleCustomPluginUploadFile}, {"/fcpm/v2/compute-pools", handleFcpmComputePools}, + {"/fcpm/v2/compute-pool-config", handleFcpmComputePoolConfigs}, {"/fcpm/v2/compute-pools/{id}", handleFcpmComputePoolsId}, {"/fcpm/v2/regions", handleFcpmRegions}, {"/iam/v2/api-keys", handleIamApiKeys}, diff --git a/test/test-server/fcpm_handlers.go b/test/test-server/fcpm_handlers.go index 34d6e34d3b..7a0edba26b 100644 --- a/test/test-server/fcpm_handlers.go +++ b/test/test-server/fcpm_handlers.go @@ -69,6 +69,34 @@ func handleFcpmComputePools(t *testing.T) http.HandlerFunc { } } +func handleFcpmComputePoolConfigs(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + config1 := flinkv2.FcpmV2OrgComputePoolConfig{ + Spec: &flinkv2.FcpmV2OrgComputePoolConfigSpec{ + DefaultPoolEnabled: flinkv2.PtrBool(false), + DefaultPoolMaxCfu: flinkv2.PtrInt32(50), + }, + } + err := json.NewEncoder(w).Encode(config1) + require.NoError(t, err) + case http.MethodPatch: + update := new(flinkv2.FcpmV2OrgComputePoolConfig) + err := json.NewDecoder(r.Body).Decode(update) + require.NoError(t, err) + config1 := flinkv2.FcpmV2OrgComputePoolConfig{ + Spec: &flinkv2.FcpmV2OrgComputePoolConfigSpec{ + DefaultPoolEnabled: flinkv2.PtrBool(update.Spec.GetDefaultPoolEnabled()), + DefaultPoolMaxCfu: flinkv2.PtrInt32(update.Spec.GetDefaultPoolMaxCfu()), + }, + } + err = json.NewEncoder(w).Encode(config1) + require.NoError(t, err) + } + } +} + func handleFcpmComputePoolsId(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var computePool flinkv2.FcpmV2ComputePool From 288e84c08344cc18eef0ccf057a40d7e301031d8 Mon Sep 17 00:00:00 2001 From: Tushar Malik Date: Fri, 17 Oct 2025 10:51:53 -0500 Subject: [PATCH 4/7] Tests fix --- .../flink/compute-pool/create-help.golden | 1 + .../flink/compute-pool/update-help.golden | 1 + test/fixtures/output/flink/help-onprem.golden | 10 +++++----- test/fixtures/output/flink/help.golden | 17 +++++++++-------- test/fixtures/output/flink/shell-help.golden | 2 ++ .../output/flink/statement/create-help.golden | 2 ++ 6 files changed, 20 insertions(+), 13 deletions(-) diff --git a/test/fixtures/output/flink/compute-pool/create-help.golden b/test/fixtures/output/flink/compute-pool/create-help.golden index 8de3410ec2..8cc34e527f 100644 --- a/test/fixtures/output/flink/compute-pool/create-help.golden +++ b/test/fixtures/output/flink/compute-pool/create-help.golden @@ -12,6 +12,7 @@ Flags: --cloud string REQUIRED: Specify the cloud provider as "aws", "azure", or "gcp". --region string REQUIRED: Cloud region for Flink (use "confluent flink region list" to see all). --max-cfu int32 Maximum number of Confluent Flink Units (CFU). (default 5) + --default-pool Indicate whether the Flink compute pool is a default compute pool or not. --environment string Environment ID. -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") diff --git a/test/fixtures/output/flink/compute-pool/update-help.golden b/test/fixtures/output/flink/compute-pool/update-help.golden index f5b6e18c98..96d727f956 100644 --- a/test/fixtures/output/flink/compute-pool/update-help.golden +++ b/test/fixtures/output/flink/compute-pool/update-help.golden @@ -11,6 +11,7 @@ Update name and CFU count of a Flink compute pool. Flags: --name string Name of the compute pool. --max-cfu int32 Maximum number of Confluent Flink Units (CFU). + --default-pool Indicate whether the Flink compute pool is a default compute pool or not. --environment string Environment ID. -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") diff --git a/test/fixtures/output/flink/help-onprem.golden b/test/fixtures/output/flink/help-onprem.golden index c50aa1f00e..98a40ac7f7 100644 --- a/test/fixtures/output/flink/help-onprem.golden +++ b/test/fixtures/output/flink/help-onprem.golden @@ -4,11 +4,11 @@ Usage: confluent flink [command] Available Commands: - application Manage Flink applications. - catalog Manage Flink catalogs in Confluent Platform. - compute-pool Manage Flink compute pools. - environment Manage Flink environments. - statement Manage Flink SQL statements. + application Manage Flink applications. + catalog Manage Flink catalogs in Confluent Platform. + compute-pool Manage Flink compute pools. + environment Manage Flink environments. + statement Manage Flink SQL statements. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/help.golden b/test/fixtures/output/flink/help.golden index 6f3124ea73..6c9992c751 100644 --- a/test/fixtures/output/flink/help.golden +++ b/test/fixtures/output/flink/help.golden @@ -4,14 +4,15 @@ Usage: confluent flink [command] Available Commands: - artifact Manage Flink UDF artifacts. - compute-pool Manage Flink compute pools. - connection Manage Flink connections. - connectivity-type Manage Flink connectivity type. - endpoint Manage Flink endpoint. - region Manage Flink regions. - shell Start Flink interactive SQL client. - statement Manage Flink SQL statements. + artifact Manage Flink UDF artifacts. + compute-pool Manage Flink compute pools. + compute-pool-config Manage Flink compute pools configs. + connection Manage Flink connections. + connectivity-type Manage Flink connectivity type. + endpoint Manage Flink endpoint. + region Manage Flink regions. + shell Start Flink interactive SQL client. + statement Manage Flink SQL statements. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/shell-help.golden b/test/fixtures/output/flink/shell-help.golden index b8a114664f..840e136649 100644 --- a/test/fixtures/output/flink/shell-help.golden +++ b/test/fixtures/output/flink/shell-help.golden @@ -12,6 +12,8 @@ Flags: --database string The database which will be used as the default database. When using Kafka, this is the cluster ID. --environment string Environment ID. --context string CLI context name. + --cloud string Specify the cloud provider as "aws", "azure", or "gcp". + --region string Cloud region for Flink (use "confluent flink region list" to see all). Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/statement/create-help.golden b/test/fixtures/output/flink/statement/create-help.golden index 8ebca61d1f..da60ffb363 100644 --- a/test/fixtures/output/flink/statement/create-help.golden +++ b/test/fixtures/output/flink/statement/create-help.golden @@ -22,6 +22,8 @@ Flags: --environment string Environment ID. --context string CLI context name. -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + --cloud string Specify the cloud provider as "aws", "azure", or "gcp". + --region string Cloud region for Flink (use "confluent flink region list" to see all). Global Flags: -h, --help Show help for this command. From 7a86a9354d0ca13c5d3cb47eedefdf6f03a94fe7 Mon Sep 17 00:00:00 2001 From: Tushar Malik Date: Fri, 17 Oct 2025 14:10:47 -0500 Subject: [PATCH 5/7] Tests fix --- .../command_compute_pool_config_update.go | 4 ++-- .../compute-pool-config/describe-help.golden | 12 +++++++++++ .../flink/compute-pool-config/help.golden | 15 ++++++++++++++ .../compute-pool-config/update-help.golden | 20 +++++++++++++++++++ 4 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 test/fixtures/output/flink/compute-pool-config/describe-help.golden create mode 100644 test/fixtures/output/flink/compute-pool-config/help.golden create mode 100644 test/fixtures/output/flink/compute-pool-config/update-help.golden diff --git a/internal/flink/command_compute_pool_config_update.go b/internal/flink/command_compute_pool_config_update.go index 644f06b33f..1ead164497 100644 --- a/internal/flink/command_compute_pool_config_update.go +++ b/internal/flink/command_compute_pool_config_update.go @@ -24,8 +24,8 @@ func (c *command) newComputePoolConfigUpdateCommand() *cobra.Command { ), } - cmd.Flags().Int32("max-cfu", -1, "Maximum number of Confluent Flink Units (CFU).") - cmd.Flags().Bool("default-pool", false, "Maximum number of Confluent Flink Units (CFUs) that default compute pools in this organization should auto-scale to.") + cmd.Flags().Int32("max-cfu", -1, "Maximum number of Confluent Flink Units (CFUs) that default compute pools in this organization should auto-scale to.") + cmd.Flags().Bool("default-pool", false, "Whether default compute pools are enabled for the organization") pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddOutputFlag(cmd) diff --git a/test/fixtures/output/flink/compute-pool-config/describe-help.golden b/test/fixtures/output/flink/compute-pool-config/describe-help.golden new file mode 100644 index 0000000000..ba7d141896 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool-config/describe-help.golden @@ -0,0 +1,12 @@ +Describe a Flink compute pool config. + +Usage: + confluent flink compute-pool-config describe [flags] + +Flags: + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/compute-pool-config/help.golden b/test/fixtures/output/flink/compute-pool-config/help.golden new file mode 100644 index 0000000000..0296ca14a6 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool-config/help.golden @@ -0,0 +1,15 @@ +Manage Flink compute pools configs. + +Usage: + confluent flink compute-pool-config [command] + +Available Commands: + describe Describe a Flink compute pool config. + update Update a Flink compute pool config. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + +Use "confluent flink compute-pool-config [command] --help" for more information about a command. diff --git a/test/fixtures/output/flink/compute-pool-config/update-help.golden b/test/fixtures/output/flink/compute-pool-config/update-help.golden new file mode 100644 index 0000000000..f860f536e1 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool-config/update-help.golden @@ -0,0 +1,20 @@ +Update a Flink compute pool config. + +Usage: + confluent flink compute-pool-config update [flags] + +Examples: +Update CFU count of a Flink compute pool config. + + $ confluent flink compute-pool update --max-cfu 5 + +Flags: + --max-cfu int32 Maximum number of Confluent Flink Units (CFUs) that default compute pools in this organization should auto-scale to. (default -1) + --default-pool Whether default compute pools are enabled for the organization + --environment string Environment ID. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). From b41d251dc2fe73932ee140084ff9134906e1dad4 Mon Sep 17 00:00:00 2001 From: Tushar Malik Date: Fri, 17 Oct 2025 15:10:09 -0500 Subject: [PATCH 6/7] Lint --- internal/flink/command_compute_pool_config_describe.go | 4 ++-- internal/flink/command_compute_pool_config_update.go | 7 ++++--- internal/flink/command_shell.go | 4 ++-- internal/flink/command_statement_create.go | 2 +- .../output/flink/compute-pool-config/update-help.golden | 2 +- test/fixtures/output/flink/shell-help.golden | 2 +- 6 files changed, 11 insertions(+), 10 deletions(-) diff --git a/internal/flink/command_compute_pool_config_describe.go b/internal/flink/command_compute_pool_config_describe.go index 41f0d92e9e..1f8dfc1010 100644 --- a/internal/flink/command_compute_pool_config_describe.go +++ b/internal/flink/command_compute_pool_config_describe.go @@ -1,9 +1,10 @@ package flink import ( + "github.com/spf13/cobra" + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/output" - "github.com/spf13/cobra" ) func (c *command) newComputePoolConfigDescribeCommand() *cobra.Command { @@ -19,7 +20,6 @@ func (c *command) newComputePoolConfigDescribeCommand() *cobra.Command { } func (c *command) computePoolConfigDescribe(cmd *cobra.Command, args []string) error { - computePoolConfig, err := c.V2Client.DescribeFlinkComputePoolConfig() if err != nil { return err diff --git a/internal/flink/command_compute_pool_config_update.go b/internal/flink/command_compute_pool_config_update.go index 1ead164497..35fc98e560 100644 --- a/internal/flink/command_compute_pool_config_update.go +++ b/internal/flink/command_compute_pool_config_update.go @@ -1,11 +1,13 @@ package flink import ( + "github.com/spf13/cobra" + flinkv2 "github.com/confluentinc/ccloud-sdk-go-v2/flink/v2" + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/examples" "github.com/confluentinc/cli/v4/pkg/output" - "github.com/spf13/cobra" ) func (c *command) newComputePoolConfigUpdateCommand() *cobra.Command { @@ -25,7 +27,7 @@ func (c *command) newComputePoolConfigUpdateCommand() *cobra.Command { } cmd.Flags().Int32("max-cfu", -1, "Maximum number of Confluent Flink Units (CFUs) that default compute pools in this organization should auto-scale to.") - cmd.Flags().Bool("default-pool", false, "Whether default compute pools are enabled for the organization") + cmd.Flags().Bool("default-pool", false, "Whether default compute pools are enabled for the organization.") pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddOutputFlag(cmd) @@ -35,7 +37,6 @@ func (c *command) newComputePoolConfigUpdateCommand() *cobra.Command { } func (c *command) computePoolConfigUpdate(cmd *cobra.Command, args []string) error { - computePoolConfig, err := c.V2Client.DescribeFlinkComputePoolConfig() if err != nil { return err diff --git a/internal/flink/command_shell.go b/internal/flink/command_shell.go index 5560f00a28..cd9f305922 100644 --- a/internal/flink/command_shell.go +++ b/internal/flink/command_shell.go @@ -37,10 +37,10 @@ func (c *command) newShellCommand(prerunner pcmd.PreRunner, cfg *config.Config) cmd.RunE = func(cmd *cobra.Command, args []string) error { return c.startFlinkSqlClient(prerunner, cmd) } + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) c.addComputePoolFlag(cmd) pcmd.AddServiceAccountFlag(cmd, c.AuthenticatedCLICommand) c.addDatabaseFlag(cmd) - pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddContextFlag(cmd, c.CLICommand) pcmd.AddCloudFlag(cmd) pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) @@ -59,8 +59,8 @@ func (c *command) newShellCommand(prerunner pcmd.PreRunner, cfg *config.Config) cmd.RunE = func(cmd *cobra.Command, args []string) error { return c.startFlinkSqlClientOnPrem(prerunner, cmd) } - cmd.Flags().String("compute-pool", "", "The compute pool name to execute the Flink SQL statement.") cmd.Flags().String("environment", "", "Name of the Flink environment.") + cmd.Flags().String("compute-pool", "", "The compute pool name to execute the Flink SQL statement.") cmd.Flags().String("catalog", "", "The name of the default catalog.") cmd.Flags().String("database", "", "The name of the default database.") cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration.") diff --git a/internal/flink/command_statement_create.go b/internal/flink/command_statement_create.go index 744421f475..eda49f8606 100644 --- a/internal/flink/command_statement_create.go +++ b/internal/flink/command_statement_create.go @@ -2,13 +2,13 @@ package flink import ( "fmt" - "github.com/confluentinc/cli/v4/pkg/ccloudv2" "time" "github.com/spf13/cobra" flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1" + "github.com/confluentinc/cli/v4/pkg/ccloudv2" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/examples" diff --git a/test/fixtures/output/flink/compute-pool-config/update-help.golden b/test/fixtures/output/flink/compute-pool-config/update-help.golden index f860f536e1..7edde51ee6 100644 --- a/test/fixtures/output/flink/compute-pool-config/update-help.golden +++ b/test/fixtures/output/flink/compute-pool-config/update-help.golden @@ -10,7 +10,7 @@ Update CFU count of a Flink compute pool config. Flags: --max-cfu int32 Maximum number of Confluent Flink Units (CFUs) that default compute pools in this organization should auto-scale to. (default -1) - --default-pool Whether default compute pools are enabled for the organization + --default-pool Whether default compute pools are enabled for the organization. --environment string Environment ID. -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") diff --git a/test/fixtures/output/flink/shell-help.golden b/test/fixtures/output/flink/shell-help.golden index 840e136649..186a202b01 100644 --- a/test/fixtures/output/flink/shell-help.golden +++ b/test/fixtures/output/flink/shell-help.golden @@ -7,10 +7,10 @@ Examples: For a Quick Start with examples in context, see https://docs.confluent.io/cloud/current/flink/get-started/quick-start-shell.html. Flags: + --environment string Environment ID. --compute-pool string Flink compute pool ID. --service-account string Service account ID. --database string The database which will be used as the default database. When using Kafka, this is the cluster ID. - --environment string Environment ID. --context string CLI context name. --cloud string Specify the cloud provider as "aws", "azure", or "gcp". --region string Cloud region for Flink (use "confluent flink region list" to see all). From 25301fedbaa6dab6610715639e890111b874abcd Mon Sep 17 00:00:00 2001 From: Tushar Malik Date: Mon, 10 Nov 2025 15:23:00 -0500 Subject: [PATCH 7/7] Nits --- internal/flink/command.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/flink/command.go b/internal/flink/command.go index 0f17a718bb..7fc2831f9d 100644 --- a/internal/flink/command.go +++ b/internal/flink/command.go @@ -58,7 +58,6 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command { return cmd } -// here func (c *command) addComputePoolFlag(cmd *cobra.Command) { cmd.Flags().String("compute-pool", "", "Flink compute pool ID.") pcmd.RegisterFlagCompletionFunc(cmd, "compute-pool", c.autocompleteComputePools)