Skip to content

Commit f171117

Browse files
authored
Merge pull request #75 from redpanda-data/topic-cfg-remove
topic config: support removal of state configs
2 parents a2681d7 + 1082975 commit f171117

File tree

4 files changed

+73
-20
lines changed

4 files changed

+73
-20
lines changed

examples/datasource/main.tf

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@ data "redpanda_cluster" "test" {
77
id = var.cluster_id
88
}
99

10+
resource "redpanda_topic" "test" {
11+
name = var.topic_name
12+
partition_count = var.partition_count
13+
replication_factor = var.replication_factor
14+
cluster_api_url = data.redpanda_cluster.test.cluster_api_url
15+
allow_deletion = true
16+
configuration = var.topic_config
17+
}
18+
1019
resource "redpanda_user" "test" {
1120
name = var.user_name
1221
password = var.user_pw
@@ -25,6 +34,12 @@ resource "redpanda_acl" "test" {
2534
cluster_api_url = data.redpanda_cluster.test.cluster_api_url
2635
}
2736

37+
variable "topic_config" {
38+
default = {
39+
"cleanup.policy" = "compact"
40+
"compression.type" = "snappy"
41+
}
42+
}
2843
variable "user_name" {
2944
default = "test-username"
3045
}

redpanda/resources/topic/resource_topic.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,12 @@ func (t *Topic) Update(ctx context.Context, request resource.UpdateRequest, resp
256256
}
257257
defer t.dataplaneConn.Close()
258258
if !plan.Configuration.Equal(state.Configuration) {
259-
cfgToSet, err := utils.MapToUpdateTopicConfiguration(plan.Configuration)
259+
cfgToSet, err := parseUpdateConfiguration(state, plan)
260260
if err != nil {
261261
response.Diagnostics.AddError("failed to parse the configuration map", err.Error())
262262
return
263263
}
264-
cfgs, err := t.TopicClient.UpdateTopicConfigurations(ctx, &dataplanev1alpha1.UpdateTopicConfigurationsRequest{
264+
cfgs, err := t.TopicClient.SetTopicConfigurations(ctx, &dataplanev1alpha1.SetTopicConfigurationsRequest{
265265
TopicName: plan.Name.ValueString(),
266266
Configurations: cfgToSet,
267267
})
@@ -361,3 +361,41 @@ func filterDynamicConfig(configs []*dataplanev1alpha1.Topic_Configuration) []*da
361361
func isAlreadyExistsError(err error) bool {
362362
return strings.Contains(err.Error(), "TOPIC_ALREADY_EXISTS") || strings.Contains(err.Error(), "The topic has already been created")
363363
}
364+
365+
// parseUpdateConfiguration parses both the state and the plan configuration,
366+
// and generates a new SetConfiguration map with:
367+
// - Configurations that are both in the state, and the plan.
368+
// - Configurations that are in the plan and not in the state.
369+
// - Updated configurations that are in the plan.
370+
func parseUpdateConfiguration(state, plan models.Topic) ([]*dataplanev1alpha1.SetTopicConfigurationsRequest_SetConfiguration, error) {
371+
var output []*dataplanev1alpha1.SetTopicConfigurationsRequest_SetConfiguration
372+
// Loop through state configuration.
373+
for k, v := range state.Configuration.Elements() {
374+
if v.IsNull() || v.IsUnknown() {
375+
return nil, fmt.Errorf("topic configuration %q must have a value", k)
376+
}
377+
// If configuration also exists in plan.
378+
if pv, ok := plan.Configuration.Elements()[k]; ok {
379+
// Add whatever is in the plan to the output.
380+
value := strings.Trim(pv.String(), `"`)
381+
output = append(output, &dataplanev1alpha1.SetTopicConfigurationsRequest_SetConfiguration{
382+
Name: k,
383+
Value: &value,
384+
})
385+
}
386+
// If it's in the state, but not in the plan, then the
387+
// config was removed. We do not add that.
388+
}
389+
// Now we check for what's in the plan and not in the state.
390+
for k, v := range plan.Configuration.Elements() {
391+
// Add to output if it's not in the state.
392+
if _, ok := state.Configuration.Elements()[k]; !ok {
393+
value := strings.Trim(v.String(), `"`)
394+
output = append(output, &dataplanev1alpha1.SetTopicConfigurationsRequest_SetConfiguration{
395+
Name: k,
396+
Value: &value,
397+
})
398+
}
399+
}
400+
return output, nil
401+
}

redpanda/tests/acceptance_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
networkResourceName = "redpanda_network.test"
2929
clusterResourceName = "redpanda_cluster.test"
3030
userResourceName = "redpanda_user.test"
31+
topicResourceName = "redpanda_topic.test"
3132
aclResourceName = "redpanda_acl.test"
3233
)
3334

@@ -604,6 +605,14 @@ func TestAccResourcesWithDataSources(t *testing.T) {
604605
maps.Copy(origTestCaseVars, providerCfgIDSecretVars)
605606
origTestCaseVars["cluster_id"] = config.StringVariable(os.Getenv("CLUSTER_ID"))
606607
origTestCaseVars["user_name"] = config.StringVariable(name)
608+
origTestCaseVars["topic_name"] = config.StringVariable(name)
609+
610+
updateTestCaseVars := make(map[string]config.Variable)
611+
maps.Copy(updateTestCaseVars, origTestCaseVars)
612+
// Change 1, remove other
613+
updateTestCaseVars["topic_config"] = config.MapVariable(map[string]config.Variable{
614+
"compression.type": config.StringVariable("gzip"),
615+
})
607616

608617
c, err := newClients(ctx, clientID, clientSecret, "ign")
609618
if err != nil {
@@ -618,6 +627,15 @@ func TestAccResourcesWithDataSources(t *testing.T) {
618627
ProtoV6ProviderFactories: testAccProtoV6ProviderFactories,
619628
Check: resource.ComposeAggregateTestCheckFunc(
620629
resource.TestCheckResourceAttr(userResourceName, "name", name),
630+
resource.TestCheckResourceAttr(topicResourceName, "name", name),
631+
),
632+
},
633+
{
634+
ConfigFile: config.StaticFile(dataSourcesTest),
635+
ConfigVariables: updateTestCaseVars,
636+
ProtoV6ProviderFactories: testAccProtoV6ProviderFactories,
637+
Check: resource.ComposeAggregateTestCheckFunc(
638+
resource.TestCheckResourceAttr(topicResourceName, "configuration.compression.type", "gzip"),
621639
),
622640
},
623641
{

redpanda/utils/utils.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -335,24 +335,6 @@ func MapToCreateTopicConfiguration(cfg types.Map) ([]*dataplanev1alpha1.CreateTo
335335
return output, nil
336336
}
337337

338-
// MapToUpdateTopicConfiguration converts a cfg map to a slice of dataplanev1alpha1.UpdateTopicConfigurationsRequest_UpdateConfiguration
339-
func MapToUpdateTopicConfiguration(cfg types.Map) ([]*dataplanev1alpha1.UpdateTopicConfigurationsRequest_UpdateConfiguration, error) {
340-
var output []*dataplanev1alpha1.UpdateTopicConfigurationsRequest_UpdateConfiguration
341-
342-
for k, v := range cfg.Elements() {
343-
if v.IsNull() || v.IsUnknown() {
344-
return nil, fmt.Errorf("topic configuration %q must have a value", k)
345-
}
346-
value := strings.Trim(v.String(), `"`)
347-
output = append(output, &dataplanev1alpha1.UpdateTopicConfigurationsRequest_UpdateConfiguration{
348-
Name: k,
349-
Value: &value,
350-
Operation: dataplanev1alpha1.ConfigAlterOperation_CONFIG_ALTER_OPERATION_SET,
351-
})
352-
}
353-
return output, nil
354-
}
355-
356338
// NumberToInt32 converts a types.Number to an *int32
357339
func NumberToInt32(n types.Number) *int32 {
358340
i, _ := n.ValueBigFloat().Int64()

0 commit comments

Comments
 (0)