Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
50ee46f
Add MaxEcku to create update and describe
cqin-confluent Aug 13, 2025
690a4f5
Add tests
cqin-confluent Aug 14, 2025
472e1ae
Update help test and fix linter
cqin-confluent Aug 14, 2025
1da3501
Improve test coverage and nit fix
cqin-confluent Aug 20, 2025
d911e52
Nit
cqin-confluent Aug 20, 2025
c53ceff
Fix linter
cqin-confluent Aug 21, 2025
5d2ea00
Merge branch 'main' of github.com:confluentinc/cli into CLI-3394
cqin-confluent Aug 22, 2025
8aa0a3c
Merge branch 'main' of github.com:confluentinc/cli into CLI-3394
cqin-confluent Sep 6, 2025
c406b5e
Add test cases
cqin-confluent Sep 6, 2025
1510d4e
Merge branch 'main' of github.com:confluentinc/cli into CLI-3394
cqin-confluent Sep 23, 2025
9aef256
Merge branch 'main' of github.com:confluentinc/cli into CLI-3394
cqin-confluent Sep 24, 2025
557bd9f
Fix update cluster issue with both type and max-ecku, update tests
cqin-confluent Sep 25, 2025
3625e24
Refactor update cluster command
cqin-confluent Sep 25, 2025
eccb9bc
Merge
cqin-confluent Oct 1, 2025
bb099ad
Address comments
cqin-confluent Oct 3, 2025
f9a6c8d
Merge branch 'main' of github.com:confluentinc/cli into CLI-3394
cqin-confluent Oct 14, 2025
4ddf543
Merge
cqin-confluent Dec 9, 2025
1b03518
Fix conflicts due to merge
cqin-confluent Dec 10, 2025
f0566aa
Fix linter
cqin-confluent Dec 10, 2025
8b3b3c2
Nit
cqin-confluent Dec 10, 2025
35e61ec
Address comments
cqin-confluent Dec 10, 2025
cda6e9e
Update test coverage
cqin-confluent Dec 11, 2025
e02b1e9
Forgot a few files
cqin-confluent Dec 11, 2025
26328ec
Add coverage
cqin-confluent Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/lint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ var vocabWords = []string{
"deserializer",
"deserializers",
"dns",
"ecku",
"elastic",
"env",
"eu",
Expand Down
22 changes: 22 additions & 0 deletions internal/kafka/command_cluster_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (c *clusterCommand) newCreateCommand() *cobra.Command {
pcmd.AddAvailabilityFlag(cmd)
pcmd.AddTypeFlag(cmd)
cmd.Flags().Int("cku", 0, `Number of Confluent Kafka Units (non-negative). Required for Kafka clusters of type "dedicated".`)
cmd.Flags().Int("max-ecku", 0, `Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. `+
`Kafka clusters with "HIGH" availability must have at least two eCKUs.`)
pcmd.AddByokKeyFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddNetworkFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
Expand Down Expand Up @@ -135,6 +137,26 @@ func (c *clusterCommand) create(cmd *cobra.Command, args []string) error {
Byok: keyGlobalObjectReference,
}}

if cmd.Flags().Changed("max-ecku") {
maxEcku, err := cmd.Flags().GetInt("max-ecku")
if err != nil {
return err
}
if clusterType == skuDedicated {
return errors.NewErrorWithSuggestions("the `--max-ecku` flag can only be used when creating a Basic, Standard, Enterprise, or Freight Kafka cluster", "Specify a different cluster with `--type` flag.")
}

if clusterType == skuBasic {
createCluster.Spec.Config.CmkV2Basic.MaxEcku = cmkv2.PtrInt32(int32(maxEcku))
} else if clusterType == skuStandard {
createCluster.Spec.Config.CmkV2Standard.MaxEcku = cmkv2.PtrInt32(int32(maxEcku))
} else if clusterType == skuEnterprise {
createCluster.Spec.Config.CmkV2Enterprise.MaxEcku = cmkv2.PtrInt32(int32(maxEcku))
} else if clusterType == skuFreight {
createCluster.Spec.Config.CmkV2Freight.MaxEcku = cmkv2.PtrInt32(int32(maxEcku))
}
}

if cmd.Flags().Changed("cku") {
cku, err := cmd.Flags().GetInt("cku")
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions internal/kafka/command_cluster_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type describeStruct struct {
ByokKeyId string `human:"BYOK Key ID" serialized:"byok_key_id"`
EncryptionKeyId string `human:"Encryption Key ID" serialized:"encryption_key_id"`
RestEndpoint string `human:"REST Endpoint" serialized:"rest_endpoint"`
MaxEcku int32 `human:"Max eCKU,omitempty" serialized:"max_ecku,omitempty"`
TopicCount int `human:"Topic Count,omitempty" serialized:"topic_count,omitempty"`
}

Expand Down Expand Up @@ -149,6 +150,7 @@ func convertClusterToDescribeStruct(cluster *cmkv2.CmkV2Cluster, usageLimits *ka
ByokKeyId: getCmkByokId(cluster),
EncryptionKeyId: getCmkEncryptionKey(cluster),
RestEndpoint: cluster.Spec.GetHttpEndpoint(),
MaxEcku: getCmkMaxEcku(cluster),
}

// Only set limits field if usage limits are available
Expand Down Expand Up @@ -191,6 +193,13 @@ func getKafkaClusterDescribeFields(cluster *cmkv2.CmkV2Cluster, basicFields []st
if cluster.Spec.Byok != nil {
describeFields = append(describeFields, "ByokId")
}
} else {
// Max eCKU field is available only for Basic, Standard, Enterprise, and Freight clusters
// Only show it if the value is positive (non-positive values like -1 indicate it's not set)
maxEcku := getCmkMaxEcku(cluster)
if maxEcku > 0 {
describeFields = append(describeFields, "MaxEcku")
}
}

if getTopicCount {
Expand Down
94 changes: 91 additions & 3 deletions internal/kafka/command_cluster_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,24 @@ func (c *clusterCommand) newUpdateCommand() *cobra.Command {
Text: `Update the type of a Kafka cluster from "Basic" to "Standard":`,
Code: `confluent kafka cluster update lkc-123456 --type "standard"`,
},
examples.Example{
Text: `Update the Max eCKU count of a Kafka cluster:`,
Code: `confluent kafka cluster update lkc-123456 --max-ecku 5`,
},
),
}

cmd.Flags().String("name", "", "Name of the Kafka cluster.")
cmd.Flags().Uint32("cku", 0, `Number of Confluent Kafka Units. For Kafka clusters of type "dedicated" only. When shrinking a cluster, you must reduce capacity one CKU at a time.`)
cmd.Flags().String("type", "", `Type of the Kafka cluster. Only supports upgrading from "Basic" to "Standard".`)
cmd.Flags().Int("max-ecku", 0, `Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. `+
`Kafka clusters with "HIGH" availability must have at least two eCKUs.`)
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

cmd.MarkFlagsOneRequired("name", "cku", "type")
cmd.MarkFlagsOneRequired("name", "cku", "type", "max-ecku")

return cmd
}
Expand Down Expand Up @@ -91,7 +97,36 @@ func (c *clusterCommand) update(cmd *cobra.Command, args []string) error {
update.Spec.Config = &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{CmkV2Dedicated: &cmkv2.CmkV2Dedicated{Kind: "Dedicated", Cku: updatedCku}}
}

if cmd.Flags().Changed("type") {
if cmd.Flags().Changed("max-ecku") {
maxEcku, err := cmd.Flags().GetInt("max-ecku")
if err != nil {
return err
}
currentConfig := currentCluster.GetSpec().Config
if currentConfig.CmkV2Dedicated != nil {
return errors.NewErrorWithSuggestions("the `--max-ecku` flag can only be used when creating or updating a Basic, Standard, Enterprise, or Freight Kafka cluster", "Specify another cluster or use the `--cku` flag instead.")
}
if maxEcku < 1 {
return fmt.Errorf("`--max-ecku` value must be at least 1")
}

targetType := c.getCurrentClusterType(currentConfig)
if cmd.Flags().Changed("type") {
newType, err := cmd.Flags().GetString("type")
if err != nil {
return err
}
if newType == "" {
return fmt.Errorf("`--type` flag value must not be empty")
}
if currentConfig.CmkV2Basic == nil || strings.ToLower(newType) != "standard" {
return fmt.Errorf(`clusters can only be upgraded from "Basic" to "Standard"`)
}
targetType = "Standard"
}

update.Spec.Config = c.createClusterConfig(targetType, int32(maxEcku))
} else if cmd.Flags().Changed("type") {
newType, err := cmd.Flags().GetString("type")
if err != nil {
return err
Expand All @@ -100,12 +135,12 @@ func (c *clusterCommand) update(cmd *cobra.Command, args []string) error {
return fmt.Errorf("`--type` flag value must not be empty")
}

// Validate cluster type upgrade
currentConfig := currentCluster.GetSpec().Config
if currentConfig.CmkV2Basic == nil || strings.ToLower(newType) != "standard" {
return fmt.Errorf(`clusters can only be upgraded from "Basic" to "Standard"`)
}

// When upgrading type without specifying max-ecku, use default max-ecku value for Standard cluster returned by API
// Set the new cluster type
update.Spec.Config = &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
CmkV2Standard: &cmkv2.CmkV2Standard{
Expand Down Expand Up @@ -134,6 +169,59 @@ func (c *clusterCommand) update(cmd *cobra.Command, args []string) error {
return c.outputKafkaClusterDescription(cmd, &updatedCluster, true, usageLimits)
}

func (c *clusterCommand) getCurrentClusterType(config *cmkv2.CmkV2ClusterSpecConfigOneOf) string {
if config.CmkV2Basic != nil {
return "Basic"
} else if config.CmkV2Standard != nil {
return "Standard"
} else if config.CmkV2Enterprise != nil {
return "Enterprise"
} else if config.CmkV2Freight != nil {
return "Freight"
}
return "Basic"
}

func (c *clusterCommand) createClusterConfig(clusterType string, maxEcku int32) *cmkv2.CmkV2ClusterSpecUpdateConfigOneOf {
switch clusterType {
case "Basic":
return &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
CmkV2Basic: &cmkv2.CmkV2Basic{
Kind: "Basic",
MaxEcku: cmkv2.PtrInt32(maxEcku),
},
}
case "Standard":
return &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
CmkV2Standard: &cmkv2.CmkV2Standard{
Kind: "Standard",
MaxEcku: cmkv2.PtrInt32(maxEcku),
},
}
case "Enterprise":
return &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
CmkV2Enterprise: &cmkv2.CmkV2Enterprise{
Kind: "Enterprise",
MaxEcku: cmkv2.PtrInt32(maxEcku),
},
}
case "Freight":
return &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
CmkV2Freight: &cmkv2.CmkV2Freight{
Kind: "Freight",
MaxEcku: cmkv2.PtrInt32(maxEcku),
},
}
default:
return &cmkv2.CmkV2ClusterSpecUpdateConfigOneOf{
CmkV2Basic: &cmkv2.CmkV2Basic{
Kind: "Basic",
MaxEcku: cmkv2.PtrInt32(maxEcku),
},
}
}
}

func (c *clusterCommand) validateResize(cku int32, currentCluster *cmkv2.CmkV2Cluster) (int32, error) {
// Ensure the cluster is a Dedicated Cluster
if currentCluster.GetSpec().Config.CmkV2Dedicated == nil {
Expand Down
38 changes: 21 additions & 17 deletions internal/kafka/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,6 @@ func getCmkClusterType(cluster *cmkv2.CmkV2Cluster) string {
return ccstructs.Sku_name[0] // UNKNOWN
}

func getCmkMaxEcku(cluster *cmkv2.CmkV2Cluster) int32 {
if isBasic(cluster) {
return cluster.Spec.Config.CmkV2Basic.GetMaxEcku()
}
if isStandard(cluster) {
return cluster.Spec.Config.CmkV2Standard.GetMaxEcku()
}
if isEnterprise(cluster) {
return cluster.Spec.Config.CmkV2Enterprise.GetMaxEcku()
}
if isFreight(cluster) {
return cluster.Spec.Config.CmkV2Freight.GetMaxEcku()
}

return -1
}

func getCmkClusterSize(cluster *cmkv2.CmkV2Cluster) int32 {
if isDedicated(cluster) {
return *cluster.Status.Cku
Expand All @@ -147,6 +130,27 @@ func getCmkClusterPendingSize(cluster *cmkv2.CmkV2Cluster) int32 {
return -1
}

func getCmkMaxEcku(cluster *cmkv2.CmkV2Cluster) int32 {
if isBasic(cluster) {
if cluster.GetSpec().Config.CmkV2Basic.MaxEcku != nil {
return cluster.GetSpec().Config.CmkV2Basic.GetMaxEcku()
}
} else if isStandard(cluster) {
if cluster.GetSpec().Config.CmkV2Standard.MaxEcku != nil {
return cluster.GetSpec().Config.CmkV2Standard.GetMaxEcku()
}
} else if isEnterprise(cluster) {
if cluster.GetSpec().Config.CmkV2Enterprise.MaxEcku != nil {
return cluster.GetSpec().Config.CmkV2Enterprise.GetMaxEcku()
}
} else if isFreight(cluster) {
if cluster.GetSpec().Config.CmkV2Freight.MaxEcku != nil {
return cluster.GetSpec().Config.CmkV2Freight.GetMaxEcku()
}
}
return -1
}

func getCmkByokId(cluster *cmkv2.CmkV2Cluster) string {
if isDedicated(cluster) && cluster.Spec.Byok != nil {
return cluster.Spec.Byok.Id
Expand Down
1 change: 1 addition & 0 deletions test/fixtures/output/kafka/1.golden
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Flags:
--availability string Specify the availability of the cluster as "single-zone", "multi-zone", "low", or "high". (default "single-zone")
--type string Specify the type of the Kafka cluster as "basic", "standard", "enterprise", "freight", or "dedicated". (default "basic")
--cku int Number of Confluent Kafka Units (non-negative). Required for Kafka clusters of type "dedicated".
--max-ecku int Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. Kafka clusters with "HIGH" availability must have at least two eCKUs.
--byok string Confluent Cloud Key ID of a registered encryption key (use "confluent byok create" to register a key).
--network string Network ID.
--context string CLI context name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ It may take up to 5 minutes for the Kafka cluster to be ready.
| ID | lkc-with-ecku-limits |
| Name | my-basic-cluster-with-ecku-limits |
| Type | BASIC |
| Ingress Limit (MB/s) | 25 |
| Egress Limit (MB/s) | 75 |
| Ingress Limit (MB/s) | 250 |
| Egress Limit (MB/s) | 750 |
| Storage | 5000 GB |
| Cloud | aws |
| Region | us-west-2 |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Error: the `--max-ecku` flag can only be used when creating a Basic, Standard, Enterprise, or Freight Kafka cluster

Suggestions:
Specify a different cluster with `--type` flag.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
It may take up to 5 minutes for the Kafka cluster to be ready.
+----------------------+---------------------------+
| Current | false |
| ID | lkc-def963 |
| Name | my-new-cluster |
| Type | ENTERPRISE |
| Ingress Limit (MB/s) | 240 |
| Egress Limit (MB/s) | 720 |
| Storage | Unlimited |
| Cloud | aws |
| Region | us-east-1 |
| Availability | multi-zone |
| Status | PROVISIONING |
| Endpoint | SASL_SSL://kafka-endpoint |
| REST Endpoint | https://pkc-endpoint |
| Max eCKU | 4 |
+----------------------+---------------------------+
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Error: at least one of the flags in the group [name cku type] is required
Error: at least one of the flags in the group [name cku type max-ecku] is required
Usage:
confluent kafka cluster update <id> [flags]

Expand All @@ -11,10 +11,15 @@ Update the type of a Kafka cluster from "Basic" to "Standard":

$ confluent kafka cluster update lkc-123456 --type "standard"

Update the Max eCKU count of a Kafka cluster:

$ confluent kafka cluster update lkc-123456 --max-ecku 5

Flags:
--name string Name of the Kafka cluster.
--cku uint32 Number of Confluent Kafka Units. For Kafka clusters of type "dedicated" only. When shrinking a cluster, you must reduce capacity one CKU at a time.
--type string Type of the Kafka cluster. Only supports upgrading from "Basic" to "Standard".
--max-ecku int Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. Kafka clusters with "HIGH" availability must have at least two eCKUs.
--context string CLI context name.
--environment string Environment ID.
--kafka-endpoint string Endpoint to be used for this Kafka cluster.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
It may take up to 5 minutes for the Kafka cluster to be ready.
+----------------------+---------------------------+
| Current | false |
| ID | lkc-def963 |
| Name | my-new-cluster |
| Type | FREIGHT |
| Ingress Limit (MB/s) | 180 |
| Egress Limit (MB/s) | 540 |
| Storage | Unlimited |
| Cloud | aws |
| Region | us-east-1 |
| Availability | low |
| Status | PROVISIONING |
| Endpoint | SASL_SSL://kafka-endpoint |
| REST Endpoint | https://pkc-endpoint |
| Max eCKU | 3 |
+----------------------+---------------------------+
1 change: 1 addition & 0 deletions test/fixtures/output/kafka/cluster/create-help.golden
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Flags:
--availability string Specify the availability of the cluster as "single-zone", "multi-zone", "low", or "high". (default "single-zone")
--type string Specify the type of the Kafka cluster as "basic", "standard", "enterprise", "freight", or "dedicated". (default "basic")
--cku int Number of Confluent Kafka Units (non-negative). Required for Kafka clusters of type "dedicated".
--max-ecku int Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. Kafka clusters with "HIGH" availability must have at least two eCKUs.
--byok string Confluent Cloud Key ID of a registered encryption key (use "confluent byok create" to register a key).
--network string Network ID.
--context string CLI context name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ It may take up to 5 minutes for the Kafka cluster to be ready.
| ID | lkc-with-ecku-limits |
| Name | my-standard-cluster-with-ecku-limits |
| Type | STANDARD |
| Ingress Limit (MB/s) | 125 |
| Egress Limit (MB/s) | 375 |
| Ingress Limit (MB/s) | 250 |
| Egress Limit (MB/s) | 750 |
| Storage | Unlimited |
| Cloud | aws |
| Region | us-west-2 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
| Status | UP |
| Endpoint | SASL_SSL://kafka-endpoint |
| REST Endpoint | http://127.0.0.1:1025 |
| Max eCKU | 5 |
| Topic Count | 2 |
+----------------------+-------------------------------+
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
| Status | UP |
| Endpoint | SASL_SSL://kafka-endpoint |
| REST Endpoint | http://127.0.0.1:1025 |
| Max eCKU | 5 |
| Topic Count | 2 |
+----------------------+---------------------------+
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Error: failed to update Kafka cluster: failed to update Kafka cluster: The specified Max eCKU exceeds the maximum allowed limit of 10 eCKUs for ENTERPRISE SKU

Suggestions:
A cluster can't be updated while still provisioning. If you just created this cluster, retry in a few minutes.
Loading