Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/confluentinc/cli/v4

go 1.24.6
go 1.24.7

require (
github.com/antihax/optional v1.0.0
Expand Down Expand Up @@ -28,9 +28,9 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.25.0
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0
github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0
github.com/confluentinc/ccloud-sdk-go-v2/identity-provider v0.3.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 h1:ISrVOX9qJ2Sxiu/fGBqqH
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0/go.mod h1:zHG/3DzsnoHC81B1AY9K/8bMX3mxbIp5/nHHdypa//w=
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 h1:o1zKZlKbnN9uv+Y8TxwesBRryUl3lEU6lnfndEJigxQ=
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9/go.mod h1:TtTcSfm+/JvnfqEKglOZ32LIcsRbdtrQdI+TtcP7fiU=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 h1:QqtIFEB5E3CIyGMJd7NQBEtc/k3K11PX7f4Fj7sPFdo=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0/go.mod h1:GPj4sfR85OyiFQUMNEq1DtPOjYVAuE222Z6Mcapwa48=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0 h1:gRRtad0RRit38+54vKg6DtUlTjPjsuKiVSs1fvyP0nk=
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0/go.mod h1:JHg9yHyCBLL0Zm24skG4pGaSR49IfxPJaQg/HXzMJpw=
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0 h1:DVWL3Y4b5azgCADubtyp3EhGZuyJkleINaTy2V3iius=
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.3.0/go.mod h1:P4fdIkI1ynjSvhDEGX283KhtzG51eGHQc5Cqtp7bu1Q=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0 h1:8Y1uXjolI2d5mawcfLn4OfJ81WRMQpjMFWdBm3dLdrk=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0 h1:KzlhRDrUsXbs4ZPZy6T9OWmFIVkZWHxNsDorHHSnwFs=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.18.0/go.mod h1:CuvhIQpYj/LbQeYzp7Sw2LJkTKzLh8xlFdQoKq9ZQlY=
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0 h1:37Gjdo+0Ev3g2NPEXyiVm7yTT85AlWbjXYRLvq6Aj9E=
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.15.0/go.mod h1:jnWqax4kM22sutPGMtGmHqe2usgfqYig4UtmHsLENz0=
github.com/confluentinc/ccloud-sdk-go-v2/iam-ip-filtering v0.5.0 h1:xD7CXcyAqezFnSVFB4U27oWUY4FlbyciVP0ftDIiI18=
Expand Down
1 change: 1 addition & 0 deletions internal/flink/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {

// Cloud Specific Commands
cmd.AddCommand(c.newArtifactCommand())
cmd.AddCommand(c.newComputePoolConfigCommand())
cmd.AddCommand(c.newConnectionCommand())
cmd.AddCommand(c.newConnectivityTypeCommand())
cmd.AddCommand(c.newEndpointCommand())
Expand Down
1 change: 1 addition & 0 deletions internal/flink/command_compute_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type computePoolOut struct {
Environment string `human:"Environment" serialized:"environment"`
CurrentCfu int32 `human:"Current CFU" serialized:"currrent_cfu"`
MaxCfu int32 `human:"Max CFU" serialized:"max_cfu"`
DefaultPool bool `human:"Default Pool" serialized:"default_pool"`
Cloud string `human:"Cloud" serialized:"cloud"`
Region string `human:"Region" serialized:"region"`
Status string `human:"Status" serialized:"status"`
Expand Down
22 changes: 22 additions & 0 deletions internal/flink/command_compute_pool_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package flink

import (
"github.com/spf13/cobra"
)

type computePoolConfigOut struct {
DefaultPoolEnabled bool `human:"Default Pool Enabled" serialized:"default_pool_enabled"`
DefaultPoolMaxCFU int32 `human:"Default Pool Max CFU" serialized:"default_pool_max_cfu"`
}

func (c *command) newComputePoolConfigCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "compute-pool-config",
Short: "Manage Flink compute pools configs.",
}

cmd.AddCommand(c.newComputePoolConfigDescribeCommand())
cmd.AddCommand(c.newComputePoolConfigUpdateCommand())

return cmd
}
34 changes: 34 additions & 0 deletions internal/flink/command_compute_pool_config_describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newComputePoolConfigDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe",
Short: "Describe a Flink compute pool config.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
RunE: c.computePoolConfigDescribe,
}

pcmd.AddOutputFlag(cmd)
return cmd
}

func (c *command) computePoolConfigDescribe(cmd *cobra.Command, args []string) error {
computePoolConfig, err := c.V2Client.DescribeFlinkComputePoolConfig()
if err != nil {
return err
}

table := output.NewTable(cmd)
table.Add(&computePoolConfigOut{
DefaultPoolEnabled: computePoolConfig.Spec.GetDefaultPoolEnabled(),
DefaultPoolMaxCFU: computePoolConfig.Spec.GetDefaultPoolMaxCfu(),
})
return table.Print()
}
79 changes: 79 additions & 0 deletions internal/flink/command_compute_pool_config_update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package flink

import (
"github.com/spf13/cobra"

flinkv2 "github.com/confluentinc/ccloud-sdk-go-v2/flink/v2"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/examples"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newComputePoolConfigUpdateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "update",
Short: "Update a Flink compute pool config.",
Args: cobra.MaximumNArgs(1),
ValidArgsFunction: pcmd.NewValidArgsFunction(c.validComputePoolArgs),
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
RunE: c.computePoolConfigUpdate,
Example: examples.BuildExampleString(
examples.Example{
Text: `Update CFU count of a Flink compute pool config.`,
Code: `confluent flink compute-pool update --max-cfu 5`,
},
),
}

cmd.Flags().Int32("max-cfu", -1, "Maximum number of Confluent Flink Units (CFUs) that default compute pools in this organization should auto-scale to.")
cmd.Flags().Bool("default-pool", false, "Whether default compute pools are enabled for the organization.")
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

cmd.MarkFlagsOneRequired("max-cfu", "default-pool")

return cmd
}

func (c *command) computePoolConfigUpdate(cmd *cobra.Command, args []string) error {
computePoolConfig, err := c.V2Client.DescribeFlinkComputePoolConfig()
if err != nil {
return err
}

update := flinkv2.FcpmV2OrgComputePoolConfigUpdate{
Spec: &flinkv2.FcpmV2OrgComputePoolConfigSpec{
DefaultPoolEnabled: flinkv2.PtrBool(computePoolConfig.Spec.GetDefaultPoolEnabled()),
DefaultPoolMaxCfu: flinkv2.PtrInt32(computePoolConfig.Spec.GetDefaultPoolMaxCfu()),
},
}

if cmd.Flags().Changed("default-pool") {
defaultPool, err := cmd.Flags().GetBool("default-pool")
if err != nil {
return err
}
update.Spec.DefaultPoolEnabled = flinkv2.PtrBool(defaultPool)
}

maxCfu, err := cmd.Flags().GetInt32("max-cfu")
if err != nil {
return err
}
if maxCfu != -1 {
update.Spec.DefaultPoolMaxCfu = flinkv2.PtrInt32(maxCfu)
}

updatedComputePoolConfig, err := c.V2Client.UpdateFlinkComputePoolConfig(update)
if err != nil {
return err
}

table := output.NewTable(cmd)
table.Add(&computePoolConfigOut{
DefaultPoolEnabled: updatedComputePoolConfig.Spec.GetDefaultPoolEnabled(),
DefaultPoolMaxCFU: updatedComputePoolConfig.Spec.GetDefaultPoolMaxCfu(),
})
return table.Print()
}
8 changes: 8 additions & 0 deletions internal/flink/command_compute_pool_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (c *command) newComputePoolCreateCommand() *cobra.Command {
pcmd.AddCloudFlag(cmd)
pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand)
cmd.Flags().Int32("max-cfu", 5, "Maximum number of Confluent Flink Units (CFU).")
cmd.Flags().Bool("default-pool", false, "Indicate whether the Flink compute pool is a default compute pool or not.")
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

Expand All @@ -53,6 +54,11 @@ func (c *command) computePoolCreate(cmd *cobra.Command, args []string) error {
return err
}

isDefault, err := cmd.Flags().GetBool("default-pool")
if err != nil {
return err
}

environmentId, err := c.Context.EnvironmentId()
if err != nil {
return err
Expand All @@ -68,6 +74,7 @@ func (c *command) computePoolCreate(cmd *cobra.Command, args []string) error {
Cloud: flinkv2.PtrString(cloud),
Region: flinkv2.PtrString(region),
MaxCfu: flinkv2.PtrInt32(maxCfu),
DefaultPool: flinkv2.PtrBool(isDefault),
Environment: &flinkv2.GlobalObjectReference{
Id: environmentId,
Related: environment.Metadata.GetSelf(),
Expand All @@ -88,6 +95,7 @@ func (c *command) computePoolCreate(cmd *cobra.Command, args []string) error {
Environment: computePool.Spec.Environment.GetId(),
CurrentCfu: computePool.Status.GetCurrentCfu(),
MaxCfu: computePool.Spec.GetMaxCfu(),
DefaultPool: computePool.Spec.GetDefaultPool(),
Cloud: computePool.Spec.GetCloud(),
Region: computePool.Spec.GetRegion(),
Status: computePool.Status.GetPhase(),
Expand Down
1 change: 1 addition & 0 deletions internal/flink/command_compute_pool_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (c *command) computePoolDescribe(cmd *cobra.Command, args []string) error {
Environment: computePool.Spec.Environment.GetId(),
CurrentCfu: computePool.Status.GetCurrentCfu(),
MaxCfu: computePool.Spec.GetMaxCfu(),
DefaultPool: computePool.Spec.GetDefaultPool(),
Cloud: computePool.Spec.GetCloud(),
Region: computePool.Spec.GetRegion(),
Status: computePool.Status.GetPhase(),
Expand Down
1 change: 1 addition & 0 deletions internal/flink/command_compute_pool_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (c *command) computePoolList(cmd *cobra.Command, _ []string) error {
Environment: computePool.Spec.Environment.GetId(),
CurrentCfu: computePool.Status.GetCurrentCfu(),
MaxCfu: computePool.Spec.GetMaxCfu(),
DefaultPool: computePool.Spec.GetDefaultPool(),
Cloud: computePool.Spec.GetCloud(),
Region: computePool.Spec.GetRegion(),
Status: computePool.Status.GetPhase(),
Expand Down
12 changes: 11 additions & 1 deletion internal/flink/command_compute_pool_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ func (c *command) newComputePoolUpdateCommand() *cobra.Command {

cmd.Flags().String("name", "", "Name of the compute pool.")
cmd.Flags().Int32("max-cfu", 0, "Maximum number of Confluent Flink Units (CFU).")
cmd.Flags().Bool("default-pool", false, "Indicate whether the Flink compute pool is a default compute pool or not.")
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

cmd.MarkFlagsOneRequired("name", "max-cfu")
cmd.MarkFlagsOneRequired("name", "max-cfu", "default-pool")

return cmd
}
Expand Down Expand Up @@ -92,6 +93,14 @@ func (c *command) computePoolUpdate(cmd *cobra.Command, args []string) error {
update.Spec.DisplayName = flinkv2.PtrString(name)
}

if cmd.Flags().Changed("default-pool") {
defaultPool, err := cmd.Flags().GetBool("default-pool")
if err != nil {
return err
}
update.Spec.DefaultPool = flinkv2.PtrBool(defaultPool)
}

updatedComputePool, err := c.V2Client.UpdateFlinkComputePool(id, update)
if err != nil {
return err
Expand All @@ -105,6 +114,7 @@ func (c *command) computePoolUpdate(cmd *cobra.Command, args []string) error {
Environment: updatedComputePool.Spec.Environment.GetId(),
CurrentCfu: computePool.Status.GetCurrentCfu(),
MaxCfu: updatedComputePool.Spec.GetMaxCfu(),
DefaultPool: updatedComputePool.Spec.GetDefaultPool(),
Cloud: computePool.Spec.GetCloud(),
Region: computePool.Spec.GetRegion(),
Status: computePool.Status.GetPhase(),
Expand Down
54 changes: 41 additions & 13 deletions internal/flink/command_shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ func (c *command) newShellCommand(prerunner pcmd.PreRunner, cfg *config.Config)
cmd.RunE = func(cmd *cobra.Command, args []string) error {
return c.startFlinkSqlClient(prerunner, cmd)
}
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
c.addComputePoolFlag(cmd)
pcmd.AddServiceAccountFlag(cmd, c.AuthenticatedCLICommand)
c.addDatabaseFlag(cmd)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddCloudFlag(cmd)
pcmd.AddRegionFlagFlink(cmd, c.AuthenticatedCLICommand)

if featureflags.Manager.BoolVariation("cli.flink.internal", cfg.Context(), config.CliLaunchDarklyClient, true, false) {
cmd.Flags().StringSlice("config-key", []string{}, "App option keys for local mode.")
Expand All @@ -57,15 +59,14 @@ func (c *command) newShellCommand(prerunner pcmd.PreRunner, cfg *config.Config)
cmd.RunE = func(cmd *cobra.Command, args []string) error {
return c.startFlinkSqlClientOnPrem(prerunner, cmd)
}
cmd.Flags().String("compute-pool", "", "The compute pool name to execute the Flink SQL statement.")
cmd.Flags().String("environment", "", "Name of the Flink environment.")
cmd.Flags().String("compute-pool", "", "The compute pool name to execute the Flink SQL statement.")
cmd.Flags().String("catalog", "", "The name of the default catalog.")
cmd.Flags().String("database", "", "The name of the default database.")
cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration.")
addCmfFlagSet(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("environment"))
cobra.CheckErr(cmd.MarkFlagRequired("compute-pool"))
}

return cmd
Expand All @@ -83,9 +84,20 @@ func (c *command) authenticated(authenticated func(*cobra.Command, []string) err
return err
}

flinkGatewayClient, err := c.GetFlinkGatewayClient(true)
if err != nil {
return err
var flinkGatewayClient *ccloudv2.FlinkGatewayClient
var errClient error
computePool := c.Context.GetCurrentFlinkComputePool()

if computePool == "" {
flinkGatewayClient, errClient = c.GetFlinkGatewayClient(false)
if errClient != nil {
return errClient
}
} else {
flinkGatewayClient, errClient = c.GetFlinkGatewayClient(true)
if errClient != nil {
return errClient
}
}

jwtCtx := &config.Context{State: &config.ContextState{AuthToken: flinkGatewayClient.AuthToken}}
Expand Down Expand Up @@ -168,12 +180,20 @@ func (c *command) startFlinkSqlClient(prerunner pcmd.PreRunner, cmd *cobra.Comma
catalog = environment.GetDisplayName()
}

cloud, err := cmd.Flags().GetString("cloud")
if err != nil {
return err
}
region, err := cmd.Flags().GetString("region")
if err != nil {
return err
}

computePool := c.Context.GetCurrentFlinkComputePool()
if computePool == "" {
return errors.NewErrorWithSuggestions(
"no compute pool selected",
"Select a compute pool with `confluent flink compute-pool use` or `--compute-pool`.",
)
if cloud == "" || region == "" {
return errors.New("Flink cloud and region flags are required when compute pool is not specified.")
}
}

serviceAccount, err := cmd.Flags().GetString("service-account")
Expand Down Expand Up @@ -201,9 +221,17 @@ func (c *command) startFlinkSqlClient(prerunner pcmd.PreRunner, cmd *cobra.Comma
return err
}

flinkGatewayClient, err := c.GetFlinkGatewayClient(true)
if err != nil {
return err
var flinkGatewayClient *ccloudv2.FlinkGatewayClient
if computePool == "" {
flinkGatewayClient, err = c.GetFlinkGatewayClient(false)
if err != nil {
return err
}
} else {
flinkGatewayClient, err = c.GetFlinkGatewayClient(true)
if err != nil {
return err
}
}

lspBaseUrl, err := c.getFlinkLanguageServiceUrl(flinkGatewayClient)
Expand Down
2 changes: 1 addition & 1 deletion internal/flink/command_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type statementOut struct {
CreationDate time.Time `human:"Creation Date" serialized:"creation_date"`
Name string `human:"Name" serialized:"name"`
Statement string `human:"Statement" serialized:"statement"`
ComputePool string `human:"Compute Pool" serialized:"compute_pool"`
ComputePool string `human:"Compute Pool,omitempty" serialized:"compute_pool,omitempty"`
Status string `human:"Status" serialized:"status"`
StatusDetail string `human:"Status Detail,omitempty" serialized:"status_detail,omitempty"`
LatestOffsets map[string]string `human:"Latest Offsets" serialized:"latest_offsets"`
Expand Down
Loading