diff --git a/go.mod b/go.mod index dc0e179098..2458d44600 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/confluentinc/cli/v4 -go 1.24.6 +go 1.24.7 require ( github.com/antihax/optional v1.0.0 @@ -28,9 +28,9 @@ require ( github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.25.0 github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 - github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 + github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0 github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 - github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0 + github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0 github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0 github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0 github.com/confluentinc/ccloud-sdk-go-v2/identity-provider v0.3.0 diff --git a/go.sum b/go.sum index d6b9f27d60..a705de5b2e 100644 --- a/go.sum +++ b/go.sum @@ -214,12 +214,12 @@ github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 h1:ISrVOX9qJ2Sxiu/fGBqqH github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0/go.mod h1:zHG/3DzsnoHC81B1AY9K/8bMX3mxbIp5/nHHdypa//w= github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 h1:o1zKZlKbnN9uv+Y8TxwesBRryUl3lEU6lnfndEJigxQ= github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9/go.mod h1:TtTcSfm+/JvnfqEKglOZ32LIcsRbdtrQdI+TtcP7fiU= -github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 h1:QqtIFEB5E3CIyGMJd7NQBEtc/k3K11PX7f4Fj7sPFdo= -github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0/go.mod h1:GPj4sfR85OyiFQUMNEq1DtPOjYVAuE222Z6Mcapwa48= +github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0 h1:gRRtad0RRit38+54vKg6DtUlTjPjsuKiVSs1fvyP0nk= +github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0/go.mod h1:JHg9yHyCBLL0Zm24skG4pGaSR49IfxPJaQg/HXzMJpw= github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 h1:DVWL3Y4b5azgCADubtyp3EhGZuyJkleINaTy2V3iius= github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0/go.mod h1:P4fdIkI1ynjSvhDEGX283KhtzG51eGHQc5Cqtp7bu1Q= -github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0 h1:8Y1uXjolI2d5mawcfLn4OfJ81WRMQpjMFWdBm3dLdrk= -github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU= +github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0 h1:KzlhRDrUsXbs4ZPZy6T9OWmFIVkZWHxNsDorHHSnwFs= +github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0/go.mod h1:CuvhIQpYj/LbQeYzp7Sw2LJkTKzLh8xlFdQoKq9ZQlY= github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0 h1:37Gjdo+0Ev3g2NPEXyiVm7yTT85AlWbjXYRLvq6Aj9E= github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0/go.mod h1:jnWqax4kM22sutPGMtGmHqe2usgfqYig4UtmHsLENz0= github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0 h1:xD7CXcyAqezFnSVFB4U27oWUY4FlbyciVP0ftDIiI18= diff --git a/internal/flink/command.go b/internal/flink/command.go index 12f507b843..7fc2831f9d 100644 --- a/internal/flink/command.go +++ b/internal/flink/command.go @@ -49,6 +49,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command { // Cloud Specific Commands cmd.AddCommand(c.newArtifactCommand()) + cmd.AddCommand(c.newComputePoolConfigCommand()) cmd.AddCommand(c.newConnectionCommand()) cmd.AddCommand(c.newConnectivityTypeCommand()) cmd.AddCommand(c.newEndpointCommand()) diff --git a/internal/flink/command_compute_pool.go b/internal/flink/command_compute_pool.go index 78dc1cfb28..339e571485 100644 --- a/internal/flink/command_compute_pool.go +++ b/internal/flink/command_compute_pool.go @@ -15,6 +15,7 @@ type computePoolOut struct { Environment string `human:"Environment" serialized:"environment"` CurrentCfu int32 `human:"Current CFU" serialized:"currrent_cfu"` MaxCfu int32 `human:"Max CFU" serialized:"max_cfu"` + DefaultPool bool `human:"Default Pool" serialized:"default_pool"` Cloud string `human:"Cloud" serialized:"cloud"` Region string `human:"Region" serialized:"region"` Status string `human:"Status" serialized:"status"` diff --git a/internal/flink/command_compute_pool_config.go b/internal/flink/command_compute_pool_config.go new file mode 100644 index 0000000000..f73ca4a11e --- /dev/null +++ b/internal/flink/command_compute_pool_config.go @@ -0,0 +1,22 @@ +package flink + +import ( + "github.com/spf13/cobra" +) + +type computePoolConfigOut struct { + DefaultPoolEnabled bool `human:"Default Pool Enabled" serialized:"default_pool_enabled"` + DefaultPoolMaxCFU int32 `human:"Default Pool Max CFU" serialized:"default_pool_max_cfu"` +} + +func (c *command) newComputePoolConfigCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "compute-pool-config", + Short: "Manage Flink compute pools configs.", + } + + cmd.AddCommand(c.newComputePoolConfigDescribeCommand()) + cmd.AddCommand(c.newComputePoolConfigUpdateCommand()) + + return cmd +} diff --git a/internal/flink/command_compute_pool_config_describe.go b/internal/flink/command_compute_pool_config_describe.go new file mode 100644 index 0000000000..1f8dfc1010 --- /dev/null +++ b/internal/flink/command_compute_pool_config_describe.go @@ -0,0 +1,34 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newComputePoolConfigDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe", + Short: "Describe a Flink compute pool config.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + RunE: c.computePoolConfigDescribe, + } + + pcmd.AddOutputFlag(cmd) + return cmd +} + +func (c *command) computePoolConfigDescribe(cmd *cobra.Command, args []string) error { + computePoolConfig, err := c.V2Client.DescribeFlinkComputePoolConfig() + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&computePoolConfigOut{ + DefaultPoolEnabled: computePoolConfig.Spec.GetDefaultPoolEnabled(), + DefaultPoolMaxCFU: computePoolConfig.Spec.GetDefaultPoolMaxCfu(), + }) + return table.Print() +} diff --git a/internal/flink/command_compute_pool_config_update.go b/internal/flink/command_compute_pool_config_update.go new file mode 100644 index 0000000000..35fc98e560 --- /dev/null +++ b/internal/flink/command_compute_pool_config_update.go @@ -0,0 +1,79 @@ +package flink + +import ( + "github.com/spf13/cobra" + + flinkv2 "github.com/confluentinc/ccloud-sdk-go-v2/flink/v2" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/examples" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newComputePoolConfigUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update", + Short: "Update a Flink compute pool config.", + Args: cobra.MaximumNArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validComputePoolArgs), + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + RunE: c.computePoolConfigUpdate, + Example: examples.BuildExampleString( + examples.Example{ + Text: `Update CFU count of a Flink compute pool config.`, + Code: `confluent flink compute-pool update --max-cfu 5`, + }, + ), + } + + cmd.Flags().Int32("max-cfu", -1, "Maximum number of Confluent Flink Units (CFUs) that default compute pools in this organization should auto-scale to.") + cmd.Flags().Bool("default-pool", false, "Whether default compute pools are enabled for the organization.") + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cmd.MarkFlagsOneRequired("max-cfu", "default-pool") + + return cmd +} + +func (c *command) computePoolConfigUpdate(cmd *cobra.Command, args []string) error { + computePoolConfig, err := c.V2Client.DescribeFlinkComputePoolConfig() + if err != nil { + return err + } + + update := flinkv2.FcpmV2OrgComputePoolConfigUpdate{ + Spec: &flinkv2.FcpmV2OrgComputePoolConfigSpec{ + DefaultPoolEnabled: flinkv2.PtrBool(computePoolConfig.Spec.GetDefaultPoolEnabled()), + DefaultPoolMaxCfu: flinkv2.PtrInt32(computePoolConfig.Spec.GetDefaultPoolMaxCfu()), + }, + } + + if cmd.Flags().Changed("default-pool") { + defaultPool, err := cmd.Flags().GetBool("default-pool") + if err != nil { + return err + } + update.Spec.DefaultPoolEnabled = flinkv2.PtrBool(defaultPool) + } + + maxCfu, err := cmd.Flags().GetInt32("max-cfu") + if err != nil { + return err + } + if maxCfu != -1 { + update.Spec.DefaultPoolMaxCfu = flinkv2.PtrInt32(maxCfu) + } + + updatedComputePoolConfig, err := c.V2Client.UpdateFlinkComputePoolConfig(update) + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&computePoolConfigOut{ + DefaultPoolEnabled: updatedComputePoolConfig.Spec.GetDefaultPoolEnabled(), + DefaultPoolMaxCFU: updatedComputePoolConfig.Spec.GetDefaultPoolMaxCfu(), + }) + return table.Print() +} diff --git a/internal/flink/command_compute_pool_create.go b/internal/flink/command_compute_pool_create.go index a2621768b5..1f7f8c91e0 100644 --- a/internal/flink/command_compute_pool_create.go +++ b/internal/flink/command_compute_pool_create.go @@ -28,6 +28,7 @@ func (c *command) newComputePoolCreateCommand() *cobra.Command { pcmd.AddCloudFlag(cmd) pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) cmd.Flags().Int32("max-cfu", 5, "Maximum number of Confluent Flink Units (CFU).") + cmd.Flags().Bool("default-pool", false, "Indicate whether the Flink compute pool is a default compute pool or not.") pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddOutputFlag(cmd) @@ -53,6 +54,11 @@ func (c *command) computePoolCreate(cmd *cobra.Command, args []string) error { return err } + isDefault, err := cmd.Flags().GetBool("default-pool") + if err != nil { + return err + } + environmentId, err := c.Context.EnvironmentId() if err != nil { return err @@ -68,6 +74,7 @@ func (c *command) computePoolCreate(cmd *cobra.Command, args []string) error { Cloud: flinkv2.PtrString(cloud), Region: flinkv2.PtrString(region), MaxCfu: flinkv2.PtrInt32(maxCfu), + DefaultPool: flinkv2.PtrBool(isDefault), Environment: &flinkv2.GlobalObjectReference{ Id: environmentId, Related: environment.Metadata.GetSelf(), @@ -88,6 +95,7 @@ func (c *command) computePoolCreate(cmd *cobra.Command, args []string) error { Environment: computePool.Spec.Environment.GetId(), CurrentCfu: computePool.Status.GetCurrentCfu(), MaxCfu: computePool.Spec.GetMaxCfu(), + DefaultPool: computePool.Spec.GetDefaultPool(), Cloud: computePool.Spec.GetCloud(), Region: computePool.Spec.GetRegion(), Status: computePool.Status.GetPhase(), diff --git a/internal/flink/command_compute_pool_describe.go b/internal/flink/command_compute_pool_describe.go index 7bfe1b02b1..c5c580ad22 100644 --- a/internal/flink/command_compute_pool_describe.go +++ b/internal/flink/command_compute_pool_describe.go @@ -54,6 +54,7 @@ func (c *command) computePoolDescribe(cmd *cobra.Command, args []string) error { Environment: computePool.Spec.Environment.GetId(), CurrentCfu: computePool.Status.GetCurrentCfu(), MaxCfu: computePool.Spec.GetMaxCfu(), + DefaultPool: computePool.Spec.GetDefaultPool(), Cloud: computePool.Spec.GetCloud(), Region: computePool.Spec.GetRegion(), Status: computePool.Status.GetPhase(), diff --git a/internal/flink/command_compute_pool_list.go b/internal/flink/command_compute_pool_list.go index 87970fcfa5..6977a1007d 100644 --- a/internal/flink/command_compute_pool_list.go +++ b/internal/flink/command_compute_pool_list.go @@ -48,6 +48,7 @@ func (c *command) computePoolList(cmd *cobra.Command, _ []string) error { Environment: computePool.Spec.Environment.GetId(), CurrentCfu: computePool.Status.GetCurrentCfu(), MaxCfu: computePool.Spec.GetMaxCfu(), + DefaultPool: computePool.Spec.GetDefaultPool(), Cloud: computePool.Spec.GetCloud(), Region: computePool.Spec.GetRegion(), Status: computePool.Status.GetPhase(), diff --git a/internal/flink/command_compute_pool_update.go b/internal/flink/command_compute_pool_update.go index c4775afeb7..10492ed6af 100644 --- a/internal/flink/command_compute_pool_update.go +++ b/internal/flink/command_compute_pool_update.go @@ -29,10 +29,11 @@ func (c *command) newComputePoolUpdateCommand() *cobra.Command { cmd.Flags().String("name", "", "Name of the compute pool.") cmd.Flags().Int32("max-cfu", 0, "Maximum number of Confluent Flink Units (CFU).") + cmd.Flags().Bool("default-pool", false, "Indicate whether the Flink compute pool is a default compute pool or not.") pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddOutputFlag(cmd) - cmd.MarkFlagsOneRequired("name", "max-cfu") + cmd.MarkFlagsOneRequired("name", "max-cfu", "default-pool") return cmd } @@ -92,6 +93,14 @@ func (c *command) computePoolUpdate(cmd *cobra.Command, args []string) error { update.Spec.DisplayName = flinkv2.PtrString(name) } + if cmd.Flags().Changed("default-pool") { + defaultPool, err := cmd.Flags().GetBool("default-pool") + if err != nil { + return err + } + update.Spec.DefaultPool = flinkv2.PtrBool(defaultPool) + } + updatedComputePool, err := c.V2Client.UpdateFlinkComputePool(id, update) if err != nil { return err @@ -105,6 +114,7 @@ func (c *command) computePoolUpdate(cmd *cobra.Command, args []string) error { Environment: updatedComputePool.Spec.Environment.GetId(), CurrentCfu: computePool.Status.GetCurrentCfu(), MaxCfu: updatedComputePool.Spec.GetMaxCfu(), + DefaultPool: updatedComputePool.Spec.GetDefaultPool(), Cloud: computePool.Spec.GetCloud(), Region: computePool.Spec.GetRegion(), Status: computePool.Status.GetPhase(), diff --git a/internal/flink/command_shell.go b/internal/flink/command_shell.go index 7204ae8083..cd9f305922 100644 --- a/internal/flink/command_shell.go +++ b/internal/flink/command_shell.go @@ -37,11 +37,13 @@ func (c *command) newShellCommand(prerunner pcmd.PreRunner, cfg *config.Config) cmd.RunE = func(cmd *cobra.Command, args []string) error { return c.startFlinkSqlClient(prerunner, cmd) } + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) c.addComputePoolFlag(cmd) pcmd.AddServiceAccountFlag(cmd, c.AuthenticatedCLICommand) c.addDatabaseFlag(cmd) - pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) if featureflags.Manager.BoolVariation("cli.flink.internal", cfg.Context(), config.CliLaunchDarklyClient, true, false) { cmd.Flags().StringSlice("config-key", []string{}, "App option keys for local mode.") @@ -57,15 +59,14 @@ func (c *command) newShellCommand(prerunner pcmd.PreRunner, cfg *config.Config) cmd.RunE = func(cmd *cobra.Command, args []string) error { return c.startFlinkSqlClientOnPrem(prerunner, cmd) } - cmd.Flags().String("compute-pool", "", "The compute pool name to execute the Flink SQL statement.") cmd.Flags().String("environment", "", "Name of the Flink environment.") + cmd.Flags().String("compute-pool", "", "The compute pool name to execute the Flink SQL statement.") cmd.Flags().String("catalog", "", "The name of the default catalog.") cmd.Flags().String("database", "", "The name of the default database.") cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration.") addCmfFlagSet(cmd) cobra.CheckErr(cmd.MarkFlagRequired("environment")) - cobra.CheckErr(cmd.MarkFlagRequired("compute-pool")) } return cmd @@ -83,9 +84,20 @@ func (c *command) authenticated(authenticated func(*cobra.Command, []string) err return err } - flinkGatewayClient, err := c.GetFlinkGatewayClient(true) - if err != nil { - return err + var flinkGatewayClient *ccloudv2.FlinkGatewayClient + var errClient error + computePool := c.Context.GetCurrentFlinkComputePool() + + if computePool == "" { + flinkGatewayClient, errClient = c.GetFlinkGatewayClient(false) + if errClient != nil { + return errClient + } + } else { + flinkGatewayClient, errClient = c.GetFlinkGatewayClient(true) + if errClient != nil { + return errClient + } } jwtCtx := &config.Context{State: &config.ContextState{AuthToken: flinkGatewayClient.AuthToken}} @@ -168,12 +180,20 @@ func (c *command) startFlinkSqlClient(prerunner pcmd.PreRunner, cmd *cobra.Comma catalog = environment.GetDisplayName() } + cloud, err := cmd.Flags().GetString("cloud") + if err != nil { + return err + } + region, err := cmd.Flags().GetString("region") + if err != nil { + return err + } + computePool := c.Context.GetCurrentFlinkComputePool() if computePool == "" { - return errors.NewErrorWithSuggestions( - "no compute pool selected", - "Select a compute pool with `confluent flink compute-pool use` or `--compute-pool`.", - ) + if cloud == "" || region == "" { + return errors.New("Flink cloud and region flags are required when compute pool is not specified.") + } } serviceAccount, err := cmd.Flags().GetString("service-account") @@ -201,9 +221,17 @@ func (c *command) startFlinkSqlClient(prerunner pcmd.PreRunner, cmd *cobra.Comma return err } - flinkGatewayClient, err := c.GetFlinkGatewayClient(true) - if err != nil { - return err + var flinkGatewayClient *ccloudv2.FlinkGatewayClient + if computePool == "" { + flinkGatewayClient, err = c.GetFlinkGatewayClient(false) + if err != nil { + return err + } + } else { + flinkGatewayClient, err = c.GetFlinkGatewayClient(true) + if err != nil { + return err + } } lspBaseUrl, err := c.getFlinkLanguageServiceUrl(flinkGatewayClient) diff --git a/internal/flink/command_statement.go b/internal/flink/command_statement.go index 3367e38baf..377bac0fa3 100644 --- a/internal/flink/command_statement.go +++ b/internal/flink/command_statement.go @@ -14,7 +14,7 @@ type statementOut struct { CreationDate time.Time `human:"Creation Date" serialized:"creation_date"` Name string `human:"Name" serialized:"name"` Statement string `human:"Statement" serialized:"statement"` - ComputePool string `human:"Compute Pool" serialized:"compute_pool"` + ComputePool string `human:"Compute Pool,omitempty" serialized:"compute_pool,omitempty"` Status string `human:"Status" serialized:"status"` StatusDetail string `human:"Status Detail,omitempty" serialized:"status_detail,omitempty"` LatestOffsets map[string]string `human:"Latest Offsets" serialized:"latest_offsets"` diff --git a/internal/flink/command_statement_create.go b/internal/flink/command_statement_create.go index 940b01e1ea..eda49f8606 100644 --- a/internal/flink/command_statement_create.go +++ b/internal/flink/command_statement_create.go @@ -8,6 +8,7 @@ import ( flinkgatewayv1 "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1" + "github.com/confluentinc/cli/v4/pkg/ccloudv2" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" "github.com/confluentinc/cli/v4/pkg/errors" "github.com/confluentinc/cli/v4/pkg/examples" @@ -45,6 +46,8 @@ func (c *command) newStatementCreateCommand() *cobra.Command { pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddContextFlag(cmd, c.CLICommand) pcmd.AddOutputFlag(cmd) + pcmd.AddCloudFlag(cmd) + pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand) cobra.CheckErr(cmd.MarkFlagRequired("sql")) @@ -63,18 +66,26 @@ func (c *command) statementCreate(cmd *cobra.Command, args []string) error { } computePool := c.Context.GetCurrentFlinkComputePool() + cloud, err := cmd.Flags().GetString("cloud") + if err != nil { + return err + } + + region, err := cmd.Flags().GetString("region") + if err != nil { + return err + } + if computePool == "" { - return errors.NewErrorWithSuggestions( - "no compute pool selected", - "Select a compute pool with `confluent flink compute-pool use` or `--compute-pool`.", - ) + if cloud == "" || region == "" { + return errors.New("Flink cloud and region flags are required when compute pool is not specified.") + } } name := types.GenerateStatementName() if len(args) == 1 { name = args[0] } - sql, err := cmd.Flags().GetString("sql") if err != nil { return err @@ -109,15 +120,22 @@ func (c *command) statementCreate(cmd *cobra.Command, args []string) error { statement := flinkgatewayv1.SqlV1Statement{ Name: flinkgatewayv1.PtrString(name), Spec: &flinkgatewayv1.SqlV1StatementSpec{ - Statement: flinkgatewayv1.PtrString(sql), - Properties: &statementProperties, - ComputePoolId: flinkgatewayv1.PtrString(computePool), + Statement: flinkgatewayv1.PtrString(sql), + Properties: &statementProperties, }, } - - client, err := c.GetFlinkGatewayClient(true) - if err != nil { - return err + var client *ccloudv2.FlinkGatewayClient + if computePool != "" { + statement.Spec.ComputePoolId = flinkgatewayv1.PtrString(computePool) + client, err = c.GetFlinkGatewayClient(true) + if err != nil { + return err + } + } else { + client, err = c.GetFlinkGatewayClient(false) + if err != nil { + return err + } } serviceAccount, err := cmd.Flags().GetString("service-account") @@ -129,12 +147,10 @@ func (c *command) statementCreate(cmd *cobra.Command, args []string) error { if serviceAccount == "" { principal = c.Context.GetUser().GetResourceId() } - statement, err = client.CreateStatement(statement, principal, environmentId, c.Context.LastOrgId) if err != nil { return err } - wait, err := cmd.Flags().GetBool("wait") if err != nil { return err diff --git a/pkg/ccloudv2/flink.go b/pkg/ccloudv2/flink.go index aa7fd709cb..2d96a0f45b 100644 --- a/pkg/ccloudv2/flink.go +++ b/pkg/ccloudv2/flink.go @@ -38,6 +38,11 @@ func (c *Client) DescribeFlinkComputePool(id, environment string) (flinkv2.FcpmV return res, errors.CatchComputePoolNotFoundError(err, id, httpResp) } +func (c *Client) DescribeFlinkComputePoolConfig() (flinkv2.FcpmV2OrgComputePoolConfig, error) { + res, httpResp, err := c.FlinkClient.OrgComputePoolConfigsFcpmV2Api.GetFcpmV2OrgComputePoolConfig(c.flinkApiContext()).Execute() + return res, errors.CatchCCloudV2Error(err, httpResp) +} + func (c *Client) ListFlinkComputePools(environment, specRegion string) ([]flinkv2.FcpmV2ComputePool, error) { var list []flinkv2.FcpmV2ComputePool @@ -109,3 +114,8 @@ func (c *Client) UpdateFlinkComputePool(id string, update flinkv2.FcpmV2ComputeP res, httpResp, err := c.FlinkClient.ComputePoolsFcpmV2Api.UpdateFcpmV2ComputePool(c.flinkApiContext(), id).FcpmV2ComputePoolUpdate(update).Execute() return res, errors.CatchCCloudV2Error(err, httpResp) } + +func (c *Client) UpdateFlinkComputePoolConfig(fcpmV2OrgComputePoolConfigUpdate flinkv2.FcpmV2OrgComputePoolConfigUpdate) (flinkv2.FcpmV2OrgComputePoolConfig, error) { + res, httpResp, err := c.FlinkClient.OrgComputePoolConfigsFcpmV2Api.UpdateFcpmV2OrgComputePoolConfig(c.flinkApiContext()).FcpmV2OrgComputePoolConfigUpdate(fcpmV2OrgComputePoolConfigUpdate).Execute() + return res, errors.CatchCCloudV2Error(err, httpResp) +} diff --git a/test/fixtures/output/flink/compute-pool-config/describe-help.golden b/test/fixtures/output/flink/compute-pool-config/describe-help.golden new file mode 100644 index 0000000000..ba7d141896 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool-config/describe-help.golden @@ -0,0 +1,12 @@ +Describe a Flink compute pool config. + +Usage: + confluent flink compute-pool-config describe [flags] + +Flags: + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/compute-pool-config/help.golden b/test/fixtures/output/flink/compute-pool-config/help.golden new file mode 100644 index 0000000000..0296ca14a6 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool-config/help.golden @@ -0,0 +1,15 @@ +Manage Flink compute pools configs. + +Usage: + confluent flink compute-pool-config [command] + +Available Commands: + describe Describe a Flink compute pool config. + update Update a Flink compute pool config. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + +Use "confluent flink compute-pool-config [command] --help" for more information about a command. diff --git a/test/fixtures/output/flink/compute-pool-config/update-help.golden b/test/fixtures/output/flink/compute-pool-config/update-help.golden new file mode 100644 index 0000000000..7edde51ee6 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool-config/update-help.golden @@ -0,0 +1,20 @@ +Update a Flink compute pool config. + +Usage: + confluent flink compute-pool-config update [flags] + +Examples: +Update CFU count of a Flink compute pool config. + + $ confluent flink compute-pool update --max-cfu 5 + +Flags: + --max-cfu int32 Maximum number of Confluent Flink Units (CFUs) that default compute pools in this organization should auto-scale to. (default -1) + --default-pool Whether default compute pools are enabled for the organization. + --environment string Environment ID. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/compute-pool/create-default.golden b/test/fixtures/output/flink/compute-pool/create-default.golden new file mode 100644 index 0000000000..33a6ff595b --- /dev/null +++ b/test/fixtures/output/flink/compute-pool/create-default.golden @@ -0,0 +1,12 @@ ++--------------+-------------------+ +| Current | false | +| ID | lfcp-123456 | +| Name | my-compute-pool-2 | +| Environment | env-596 | +| Current CFU | 0 | +| Max CFU | 5 | +| Default Pool | true | +| Cloud | AWS | +| Region | us-west-2 | +| Status | PROVISIONING | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/compute-pool/create-help.golden b/test/fixtures/output/flink/compute-pool/create-help.golden index 8de3410ec2..8cc34e527f 100644 --- a/test/fixtures/output/flink/compute-pool/create-help.golden +++ b/test/fixtures/output/flink/compute-pool/create-help.golden @@ -12,6 +12,7 @@ Flags: --cloud string REQUIRED: Specify the cloud provider as "aws", "azure", or "gcp". --region string REQUIRED: Cloud region for Flink (use "confluent flink region list" to see all). --max-cfu int32 Maximum number of Confluent Flink Units (CFU). (default 5) + --default-pool Indicate whether the Flink compute pool is a default compute pool or not. --environment string Environment ID. -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") diff --git a/test/fixtures/output/flink/compute-pool/create.golden b/test/fixtures/output/flink/compute-pool/create.golden index 937e8b4d3d..b77c448b29 100644 --- a/test/fixtures/output/flink/compute-pool/create.golden +++ b/test/fixtures/output/flink/compute-pool/create.golden @@ -1,11 +1,12 @@ -+-------------+-----------------+ -| Current | false | -| ID | lfcp-123456 | -| Name | my-compute-pool | -| Environment | env-596 | -| Current CFU | 0 | -| Max CFU | 5 | -| Cloud | AWS | -| Region | us-west-2 | -| Status | PROVISIONING | -+-------------+-----------------+ ++--------------+-----------------+ +| Current | false | +| ID | lfcp-123456 | +| Name | my-compute-pool | +| Environment | env-596 | +| Current CFU | 0 | +| Max CFU | 5 | +| Default Pool | false | +| Cloud | AWS | +| Region | us-west-2 | +| Status | PROVISIONING | ++--------------+-----------------+ diff --git a/test/fixtures/output/flink/compute-pool/describe-after-use.golden b/test/fixtures/output/flink/compute-pool/describe-after-use.golden index 98fdf32db9..d4f3454dde 100644 --- a/test/fixtures/output/flink/compute-pool/describe-after-use.golden +++ b/test/fixtures/output/flink/compute-pool/describe-after-use.golden @@ -1,11 +1,12 @@ -+-------------+-------------------+ -| Current | true | -| ID | lfcp-123456 | -| Name | my-compute-pool-1 | -| Environment | env-123 | -| Current CFU | 0 | -| Max CFU | 1 | -| Cloud | AWS | -| Region | eu-west-1 | -| Status | PROVISIONED | -+-------------+-------------------+ ++--------------+-------------------+ +| Current | true | +| ID | lfcp-123456 | +| Name | my-compute-pool-1 | +| Environment | env-123 | +| Current CFU | 0 | +| Max CFU | 1 | +| Default Pool | false | +| Cloud | AWS | +| Region | eu-west-1 | +| Status | PROVISIONED | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/compute-pool/describe-compute-pool-config.golden b/test/fixtures/output/flink/compute-pool/describe-compute-pool-config.golden new file mode 100644 index 0000000000..a113099c87 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool/describe-compute-pool-config.golden @@ -0,0 +1,4 @@ ++----------------------+-------+ +| Default Pool Enabled | false | +| Default Pool Max CFU | 50 | ++----------------------+-------+ diff --git a/test/fixtures/output/flink/compute-pool/describe.golden b/test/fixtures/output/flink/compute-pool/describe.golden index 316a0384dc..9c4c61c295 100644 --- a/test/fixtures/output/flink/compute-pool/describe.golden +++ b/test/fixtures/output/flink/compute-pool/describe.golden @@ -1,11 +1,12 @@ -+-------------+-------------------+ -| Current | false | -| ID | lfcp-123456 | -| Name | my-compute-pool-1 | -| Environment | env-123 | -| Current CFU | 0 | -| Max CFU | 1 | -| Cloud | AWS | -| Region | eu-west-1 | -| Status | PROVISIONED | -+-------------+-------------------+ ++--------------+-------------------+ +| Current | false | +| ID | lfcp-123456 | +| Name | my-compute-pool-1 | +| Environment | env-123 | +| Current CFU | 0 | +| Max CFU | 1 | +| Default Pool | false | +| Cloud | AWS | +| Region | eu-west-1 | +| Status | PROVISIONED | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/compute-pool/list-after-use.golden b/test/fixtures/output/flink/compute-pool/list-after-use.golden index fb9d0f1fb0..720b104ddc 100644 --- a/test/fixtures/output/flink/compute-pool/list-after-use.golden +++ b/test/fixtures/output/flink/compute-pool/list-after-use.golden @@ -1,4 +1,4 @@ - Current | ID | Name | Environment | Current CFU | Max CFU | Cloud | Region | Status -----------+-------------+-------------------+-------------+-------------+---------+-------+-----------+-------------- - * | lfcp-123456 | my-compute-pool-1 | env-123 | 0 | 1 | AWS | eu-west-1 | PROVISIONED - | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | AWS | eu-west-2 | PROVISIONED + Current | ID | Name | Environment | Current CFU | Max CFU | Default Pool | Cloud | Region | Status +----------+-------------+-------------------+-------------+-------------+---------+--------------+-------+-----------+-------------- + * | lfcp-123456 | my-compute-pool-1 | env-123 | 0 | 1 | false | AWS | eu-west-1 | PROVISIONED + | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | false | AWS | eu-west-2 | PROVISIONED diff --git a/test/fixtures/output/flink/compute-pool/list-region.golden b/test/fixtures/output/flink/compute-pool/list-region.golden index 4b81f1fde7..6554c733ff 100644 --- a/test/fixtures/output/flink/compute-pool/list-region.golden +++ b/test/fixtures/output/flink/compute-pool/list-region.golden @@ -1,3 +1,3 @@ - Current | ID | Name | Environment | Current CFU | Max CFU | Cloud | Region | Status -----------+-------------+-------------------+-------------+-------------+---------+-------+-----------+-------------- - | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | AWS | eu-west-2 | PROVISIONED + Current | ID | Name | Environment | Current CFU | Max CFU | Default Pool | Cloud | Region | Status +----------+-------------+-------------------+-------------+-------------+---------+--------------+-------+-----------+-------------- + | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | false | AWS | eu-west-2 | PROVISIONED diff --git a/test/fixtures/output/flink/compute-pool/list.golden b/test/fixtures/output/flink/compute-pool/list.golden index 174bdb42cd..abfae6cf8b 100644 --- a/test/fixtures/output/flink/compute-pool/list.golden +++ b/test/fixtures/output/flink/compute-pool/list.golden @@ -1,4 +1,4 @@ - Current | ID | Name | Environment | Current CFU | Max CFU | Cloud | Region | Status -----------+-------------+-------------------+-------------+-------------+---------+-------+-----------+-------------- - | lfcp-123456 | my-compute-pool-1 | env-123 | 0 | 1 | AWS | eu-west-1 | PROVISIONED - | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | AWS | eu-west-2 | PROVISIONED + Current | ID | Name | Environment | Current CFU | Max CFU | Default Pool | Cloud | Region | Status +----------+-------------+-------------------+-------------+-------------+---------+--------------+-------+-----------+-------------- + | lfcp-123456 | my-compute-pool-1 | env-123 | 0 | 1 | false | AWS | eu-west-1 | PROVISIONED + | lfcp-222222 | my-compute-pool-2 | env-456 | 0 | 2 | false | AWS | eu-west-2 | PROVISIONED diff --git a/test/fixtures/output/flink/compute-pool/update-after-use.golden b/test/fixtures/output/flink/compute-pool/update-after-use.golden index 3a00bd72d6..391dd56282 100644 --- a/test/fixtures/output/flink/compute-pool/update-after-use.golden +++ b/test/fixtures/output/flink/compute-pool/update-after-use.golden @@ -1,11 +1,12 @@ -+-------------+-------------------+ -| Current | true | -| ID | lfcp-123456 | -| Name | my-compute-pool-1 | -| Environment | env-123 | -| Current CFU | 0 | -| Max CFU | 5 | -| Cloud | AWS | -| Region | eu-west-1 | -| Status | PROVISIONED | -+-------------+-------------------+ ++--------------+-------------------+ +| Current | true | +| ID | lfcp-123456 | +| Name | my-compute-pool-1 | +| Environment | env-123 | +| Current CFU | 0 | +| Max CFU | 5 | +| Default Pool | false | +| Cloud | AWS | +| Region | eu-west-1 | +| Status | PROVISIONED | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/compute-pool/update-compute-pool-config.golden b/test/fixtures/output/flink/compute-pool/update-compute-pool-config.golden new file mode 100644 index 0000000000..d1e139c180 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool/update-compute-pool-config.golden @@ -0,0 +1,4 @@ ++----------------------+------+ +| Default Pool Enabled | true | +| Default Pool Max CFU | 5 | ++----------------------+------+ diff --git a/test/fixtures/output/flink/compute-pool/update-default.golden b/test/fixtures/output/flink/compute-pool/update-default.golden new file mode 100644 index 0000000000..9c4c61c295 --- /dev/null +++ b/test/fixtures/output/flink/compute-pool/update-default.golden @@ -0,0 +1,12 @@ ++--------------+-------------------+ +| Current | false | +| ID | lfcp-123456 | +| Name | my-compute-pool-1 | +| Environment | env-123 | +| Current CFU | 0 | +| Max CFU | 1 | +| Default Pool | false | +| Cloud | AWS | +| Region | eu-west-1 | +| Status | PROVISIONED | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/compute-pool/update-help.golden b/test/fixtures/output/flink/compute-pool/update-help.golden index f5b6e18c98..96d727f956 100644 --- a/test/fixtures/output/flink/compute-pool/update-help.golden +++ b/test/fixtures/output/flink/compute-pool/update-help.golden @@ -11,6 +11,7 @@ Update name and CFU count of a Flink compute pool. Flags: --name string Name of the compute pool. --max-cfu int32 Maximum number of Confluent Flink Units (CFU). + --default-pool Indicate whether the Flink compute pool is a default compute pool or not. --environment string Environment ID. -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") diff --git a/test/fixtures/output/flink/compute-pool/update.golden b/test/fixtures/output/flink/compute-pool/update.golden index ab59f5cb51..a7e403d106 100644 --- a/test/fixtures/output/flink/compute-pool/update.golden +++ b/test/fixtures/output/flink/compute-pool/update.golden @@ -1,11 +1,12 @@ -+-------------+-------------------+ -| Current | false | -| ID | lfcp-123456 | -| Name | my-compute-pool-1 | -| Environment | env-123 | -| Current CFU | 0 | -| Max CFU | 5 | -| Cloud | AWS | -| Region | eu-west-1 | -| Status | PROVISIONED | -+-------------+-------------------+ ++--------------+-------------------+ +| Current | false | +| ID | lfcp-123456 | +| Name | my-compute-pool-1 | +| Environment | env-123 | +| Current CFU | 0 | +| Max CFU | 5 | +| Default Pool | false | +| Cloud | AWS | +| Region | eu-west-1 | +| Status | PROVISIONED | ++--------------+-------------------+ diff --git a/test/fixtures/output/flink/help-onprem.golden b/test/fixtures/output/flink/help-onprem.golden index c50aa1f00e..98a40ac7f7 100644 --- a/test/fixtures/output/flink/help-onprem.golden +++ b/test/fixtures/output/flink/help-onprem.golden @@ -4,11 +4,11 @@ Usage: confluent flink [command] Available Commands: - application Manage Flink applications. - catalog Manage Flink catalogs in Confluent Platform. - compute-pool Manage Flink compute pools. - environment Manage Flink environments. - statement Manage Flink SQL statements. + application Manage Flink applications. + catalog Manage Flink catalogs in Confluent Platform. + compute-pool Manage Flink compute pools. + environment Manage Flink environments. + statement Manage Flink SQL statements. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/help.golden b/test/fixtures/output/flink/help.golden index 6f3124ea73..6c9992c751 100644 --- a/test/fixtures/output/flink/help.golden +++ b/test/fixtures/output/flink/help.golden @@ -4,14 +4,15 @@ Usage: confluent flink [command] Available Commands: - artifact Manage Flink UDF artifacts. - compute-pool Manage Flink compute pools. - connection Manage Flink connections. - connectivity-type Manage Flink connectivity type. - endpoint Manage Flink endpoint. - region Manage Flink regions. - shell Start Flink interactive SQL client. - statement Manage Flink SQL statements. + artifact Manage Flink UDF artifacts. + compute-pool Manage Flink compute pools. + compute-pool-config Manage Flink compute pools configs. + connection Manage Flink connections. + connectivity-type Manage Flink connectivity type. + endpoint Manage Flink endpoint. + region Manage Flink regions. + shell Start Flink interactive SQL client. + statement Manage Flink SQL statements. Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/shell-help.golden b/test/fixtures/output/flink/shell-help.golden index b8a114664f..186a202b01 100644 --- a/test/fixtures/output/flink/shell-help.golden +++ b/test/fixtures/output/flink/shell-help.golden @@ -7,11 +7,13 @@ Examples: For a Quick Start with examples in context, see https://docs.confluent.io/cloud/current/flink/get-started/quick-start-shell.html. Flags: + --environment string Environment ID. --compute-pool string Flink compute pool ID. --service-account string Service account ID. --database string The database which will be used as the default database. When using Kafka, this is the cluster ID. - --environment string Environment ID. --context string CLI context name. + --cloud string Specify the cloud provider as "aws", "azure", or "gcp". + --region string Cloud region for Flink (use "confluent flink region list" to see all). Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/statement/create-help.golden b/test/fixtures/output/flink/statement/create-help.golden index 8ebca61d1f..da60ffb363 100644 --- a/test/fixtures/output/flink/statement/create-help.golden +++ b/test/fixtures/output/flink/statement/create-help.golden @@ -22,6 +22,8 @@ Flags: --environment string Environment ID. --context string CLI context name. -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + --cloud string Specify the cloud provider as "aws", "azure", or "gcp". + --region string Cloud region for Flink (use "confluent flink region list" to see all). Global Flags: -h, --help Show help for this command. diff --git a/test/fixtures/output/flink/statement/create-without-compute-pool.golden b/test/fixtures/output/flink/statement/create-without-compute-pool.golden new file mode 100644 index 0000000000..554b5cfaaf --- /dev/null +++ b/test/fixtures/output/flink/statement/create-without-compute-pool.golden @@ -0,0 +1,7 @@ +No Flink endpoint is specified, defaulting to public endpoint: `http://127.0.0.1:1026` ++---------------+-------------------------------+ +| Creation Date | 2023-01-01 00:00:00 +0000 UTC | +| Name | my-statement-2 | +| Statement | INSERT * INTO table; | +| Status | PENDING | ++---------------+-------------------------------+ diff --git a/test/flink_test.go b/test/flink_test.go index 34b86fdcb9..6f872a8043 100644 --- a/test/flink_test.go +++ b/test/flink_test.go @@ -61,21 +61,6 @@ func (s *CLITestSuite) TestFlinkArtifact() { } } -func (s *CLITestSuite) TestFlinkComputePool() { - tests := []CLITest{ - {args: "flink compute-pool create my-compute-pool --cloud aws --region us-west-2", fixture: "flink/compute-pool/create.golden"}, - {args: "flink compute-pool describe lfcp-123456", fixture: "flink/compute-pool/describe.golden"}, - {args: "flink compute-pool list", fixture: "flink/compute-pool/list.golden"}, - {args: "flink compute-pool list --region eu-west-2", fixture: "flink/compute-pool/list-region.golden"}, - {args: "flink compute-pool update lfcp-123456 --max-cfu 5", fixture: "flink/compute-pool/update.golden"}, - } - - for _, test := range tests { - test.login = "cloud" - s.runIntegrationTest(test) - } -} - func (s *CLITestSuite) TestFlinkConnection() { tests := []CLITest{ {args: "flink region use --cloud aws --region eu-west-1", fixture: "flink/region/use-aws.golden"}, @@ -209,6 +194,23 @@ func (s *CLITestSuite) TestFlinkConnectivityType() { } } +func (s *CLITestSuite) TestFlinkComputePool() { + tests := []CLITest{ + {args: "flink compute-pool create my-compute-pool --cloud aws --region us-west-2", fixture: "flink/compute-pool/create.golden"}, + {args: "flink compute-pool describe lfcp-123456", fixture: "flink/compute-pool/describe.golden"}, + {args: "flink compute-pool list", fixture: "flink/compute-pool/list.golden"}, + {args: "flink compute-pool list --region eu-west-2", fixture: "flink/compute-pool/list-region.golden"}, + {args: "flink compute-pool update lfcp-123456 --max-cfu 5", fixture: "flink/compute-pool/update.golden"}, + {args: "flink compute-pool create my-compute-pool-2 --default-pool --cloud aws --region us-west-2", fixture: "flink/compute-pool/create-default.golden"}, + {args: "flink compute-pool update lfcp-123456 --default-pool=false", fixture: "flink/compute-pool/update-default.golden"}, + } + + for _, test := range tests { + test.login = "cloud" + s.runIntegrationTest(test) + } +} + func (s *CLITestSuite) TestFlinkComputePoolDelete() { tests := []CLITest{ {args: "flink compute-pool delete lfcp-123456 --force", fixture: "flink/compute-pool/delete.golden"}, @@ -251,6 +253,18 @@ func (s *CLITestSuite) TestFlinkComputePoolUse() { } } +func (s *CLITestSuite) TestFlinkComputePoolConfig() { + tests := []CLITest{ + {args: "flink compute-pool-config describe", fixture: "flink/compute-pool/describe-compute-pool-config.golden"}, + {args: "flink compute-pool-config update --max-cfu 5 --default-pool", fixture: "flink/compute-pool/update-compute-pool-config.golden"}, + } + + for _, test := range tests { + test.login = "cloud" + s.runIntegrationTest(test) + } +} + func (s *CLITestSuite) TestFlinkRegion() { tests := []CLITest{ {args: "flink region use --cloud aws --region eu-west-1", fixture: "flink/region/use-aws.golden"}, @@ -310,6 +324,7 @@ func (s *CLITestSuite) TestFlinkStatement() { func (s *CLITestSuite) TestFlinkStatementCreate() { tests := []CLITest{ {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456`, fixture: "flink/statement/create.golden"}, + {args: `flink statement create my-statement-2 --sql "INSERT * INTO table;" --cloud aws --region eu-west-1 --service-account sa-123456`, fixture: "flink/statement/create-without-compute-pool.golden"}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456`, fixture: "flink/statement/create-service-account-warning.golden"}, {args: `flink statement create my-statement --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 --wait`, fixture: "flink/statement/create-wait.golden"}, {args: `flink statement create --sql "INSERT * INTO table;" --compute-pool lfcp-123456 --service-account sa-123456 -o yaml`, fixture: "flink/statement/create-no-name-yaml.golden", regex: true}, diff --git a/test/test-server/ccloudv2_router.go b/test/test-server/ccloudv2_router.go index ad8acd47d6..65e5146315 100644 --- a/test/test-server/ccloudv2_router.go +++ b/test/test-server/ccloudv2_router.go @@ -56,6 +56,7 @@ var ccloudV2Routes = []route{ {"/connect/v1/custom-connector-runtimes", handleListCustomConnectorRuntimes}, {"/connect/v1/dummy-presigned-url", handleCustomPluginUploadFile}, {"/fcpm/v2/compute-pools", handleFcpmComputePools}, + {"/fcpm/v2/compute-pool-config", handleFcpmComputePoolConfigs}, {"/fcpm/v2/compute-pools/{id}", handleFcpmComputePoolsId}, {"/fcpm/v2/regions", handleFcpmRegions}, {"/iam/v2/api-keys", handleIamApiKeys}, diff --git a/test/test-server/fcpm_handlers.go b/test/test-server/fcpm_handlers.go index 20cee51a14..7a0edba26b 100644 --- a/test/test-server/fcpm_handlers.go +++ b/test/test-server/fcpm_handlers.go @@ -56,6 +56,7 @@ func handleFcpmComputePools(t *testing.T) http.HandlerFunc { err := json.NewDecoder(r.Body).Decode(create) require.NoError(t, err) create.Spec.Cloud = flinkv2.PtrString(strings.ToUpper(create.Spec.GetCloud())) + create.Spec.DefaultPool = flinkv2.PtrBool(create.Spec.GetDefaultPool()) v := flinkv2.FcpmV2ComputePool{ Id: flinkv2.PtrString("lfcp-123456"), @@ -68,6 +69,34 @@ func handleFcpmComputePools(t *testing.T) http.HandlerFunc { } } +func handleFcpmComputePoolConfigs(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + config1 := flinkv2.FcpmV2OrgComputePoolConfig{ + Spec: &flinkv2.FcpmV2OrgComputePoolConfigSpec{ + DefaultPoolEnabled: flinkv2.PtrBool(false), + DefaultPoolMaxCfu: flinkv2.PtrInt32(50), + }, + } + err := json.NewEncoder(w).Encode(config1) + require.NoError(t, err) + case http.MethodPatch: + update := new(flinkv2.FcpmV2OrgComputePoolConfig) + err := json.NewDecoder(r.Body).Decode(update) + require.NoError(t, err) + config1 := flinkv2.FcpmV2OrgComputePoolConfig{ + Spec: &flinkv2.FcpmV2OrgComputePoolConfigSpec{ + DefaultPoolEnabled: flinkv2.PtrBool(update.Spec.GetDefaultPoolEnabled()), + DefaultPoolMaxCfu: flinkv2.PtrInt32(update.Spec.GetDefaultPoolMaxCfu()), + }, + } + err = json.NewEncoder(w).Encode(config1) + require.NoError(t, err) + } + } +} + func handleFcpmComputePoolsId(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var computePool flinkv2.FcpmV2ComputePool @@ -105,6 +134,7 @@ func handleFcpmComputePoolsId(t *testing.T) http.HandlerFunc { Spec: &flinkv2.FcpmV2ComputePoolSpec{ DisplayName: flinkv2.PtrString("my-compute-pool-1"), MaxCfu: flinkv2.PtrInt32(update.Spec.GetMaxCfu()), + DefaultPool: flinkv2.PtrBool(update.Spec.GetDefaultPool()), Cloud: flinkv2.PtrString("AWS"), Region: flinkv2.PtrString("eu-west-1"), Environment: &flinkv2.GlobalObjectReference{Id: "env-123"}, diff --git a/test/test-server/flink_gateway_router.go b/test/test-server/flink_gateway_router.go index da626fe3ae..65ac6b4c3d 100644 --- a/test/test-server/flink_gateway_router.go +++ b/test/test-server/flink_gateway_router.go @@ -174,7 +174,11 @@ func handleSqlEnvironmentsEnvironmentStatements(t *testing.T) http.HandlerFunc { require.NoError(t, err) statement.Metadata = &flinkgatewayv1.StatementObjectMeta{CreatedAt: flinkgatewayv1.PtrTime(time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC))} - statement.Spec.ComputePoolId = flinkgatewayv1.PtrString(validFlinkStatementComputePoolId) + switch strings.ToLower(statement.GetName()) { + case "my-statement": + statement.Spec.ComputePoolId = flinkgatewayv1.PtrString(validFlinkStatementComputePoolId) + } + statement.Status = &flinkgatewayv1.SqlV1StatementStatus{Phase: "PENDING"} err = json.NewEncoder(w).Encode(statement)