Skip to content

Commit 1f15210

Browse files
authored
Merge pull request #1740 from openmeterio/refactor/topic-provisioning
refactor(kafka): topic provisioning
2 parents aa8a4f8 + cde8a86 commit 1f15210

File tree

10 files changed

+215
-14
lines changed

10 files changed

+215
-14
lines changed

app/common/kafka.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,12 @@ func NewKafkaTopicProvisionerConfig(
6565
settings config.TopicProvisionerConfig,
6666
) pkgkafka.TopicProvisionerConfig {
6767
return pkgkafka.TopicProvisionerConfig{
68-
AdminClient: adminClient,
69-
Logger: logger,
70-
Meter: meter,
71-
CacheSize: settings.CacheSize,
72-
CacheTTL: settings.CacheTTL,
68+
AdminClient: adminClient,
69+
Logger: logger,
70+
Meter: meter,
71+
CacheSize: settings.CacheSize,
72+
CacheTTL: settings.CacheTTL,
73+
ProtectedTopics: settings.ProtectedTopics,
7374
}
7475
}
7576

app/common/openmeter.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,13 @@ func NewIngestCollector(
123123
func NewKafkaNamespaceHandler(
124124
topicResolver topicresolver.Resolver,
125125
topicProvisioner pkgkafka.TopicProvisioner,
126-
conf config.Configuration,
126+
conf config.KafkaIngestConfiguration,
127127
) (*kafkaingest.NamespaceHandler, error) {
128128
return &kafkaingest.NamespaceHandler{
129129
TopicResolver: topicResolver,
130130
TopicProvisioner: topicProvisioner,
131-
Partitions: conf.Ingest.Kafka.Partitions,
131+
Partitions: conf.Partitions,
132+
DeletionEnabled: conf.NamespaceDeletionEnabled,
132133
}, nil
133134
}
134135

app/config/config_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ func TestComplete(t *testing.T) {
104104
TopicProvisionerConfig: TopicProvisionerConfig{
105105
CacheSize: 200,
106106
CacheTTL: 15 * time.Minute,
107+
ProtectedTopics: []string{
108+
"protected-topic-1",
109+
"protected-topic-2",
110+
},
107111
},
108112
},
109113
},

app/config/ingest.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ type KafkaIngestConfiguration struct {
3434

3535
Partitions int
3636
EventsTopicTemplate string
37+
38+
// NamespaceDeletionEnabled defines whether deleting namespaces are allowed or not.
39+
NamespaceDeletionEnabled bool
3740
}
3841

3942
// Validate validates the configuration.
@@ -166,6 +169,7 @@ func ConfigureIngest(v *viper.Viper) {
166169
v.SetDefault("ingest.kafka.saslPassword", "")
167170
v.SetDefault("ingest.kafka.partitions", 1)
168171
v.SetDefault("ingest.kafka.eventsTopicTemplate", "om_%s_events")
172+
v.SetDefault("ingest.kafka.namespaceDeletionEnabled", false)
169173

170174
ConfigureTopicProvisioner(v, "ingest", "kafka")
171175
}

app/config/testdata/complete.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ ingest:
4545
- consumer
4646
cacheSize: 200
4747
cacheTTL: 15m
48+
protectedTopics:
49+
- "protected-topic-1"
50+
- "protected-topic-2"
4851

4952
aggregation:
5053
clickhouse:

app/config/topicprovisioner.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ type TopicProvisionerConfig struct {
1616

1717
// The maximum time an entries is kept in cache before being evicted
1818
CacheTTL time.Duration
19+
20+
// ProtectedTopics defines a list of topics which are protected from deletion.
21+
ProtectedTopics []string
1922
}
2023

2124
func (c TopicProvisionerConfig) Validate() error {
@@ -38,4 +41,5 @@ func ConfigureTopicProvisioner(v *viper.Viper, prefixes ...string) {
3841

3942
v.SetDefault(prefixer("cacheSize"), 250)
4043
v.SetDefault(prefixer("cacheTTL"), "5m")
44+
v.SetDefault(prefixer("protectedTopics"), nil)
4145
}

cmd/server/wire_gen.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

openmeter/ingest/kafkaingest/namespace.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ type NamespaceHandler struct {
1515
TopicProvisioner pkgkafka.TopicProvisioner
1616

1717
Partitions int
18+
19+
// DeletionEnabled defines whether deleting namespaces are allowed or not.
20+
DeletionEnabled bool
1821
}
1922

2023
// CreateNamespace implements the namespace handler interface.
@@ -41,6 +44,10 @@ func (h NamespaceHandler) CreateNamespace(ctx context.Context, namespace string)
4144

4245
// DeleteNamespace implements the namespace handler interface.
4346
func (h NamespaceHandler) DeleteNamespace(ctx context.Context, namespace string) error {
47+
if !h.DeletionEnabled {
48+
return nil
49+
}
50+
4451
if h.TopicResolver == nil {
4552
return errors.New("topic name resolver must not be nil")
4653
}

pkg/kafka/topicprovisioner.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,13 @@ type TopicProvisioner interface {
3737
DeProvision(ctx context.Context, topics ...string) error
3838
}
3939

40+
type AdminClient interface {
41+
CreateTopics(ctx context.Context, topics []kafka.TopicSpecification, options ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error)
42+
DeleteTopics(ctx context.Context, topics []string, options ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error)
43+
}
44+
4045
type TopicProvisionerConfig struct {
41-
AdminClient *kafka.AdminClient
46+
AdminClient AdminClient
4247
Logger *slog.Logger
4348
Meter metric.Meter
4449

@@ -49,6 +54,9 @@ type TopicProvisionerConfig struct {
4954
// CacheTTL stores maximum time an entries is kept in cache before being evicted.
5055
// Setting it to 0 disables cache entry expiration.
5156
CacheTTL time.Duration
57+
58+
// ProtectedTopics defines a list of topics which are protected from deletion.
59+
ProtectedTopics []string
5260
}
5361

5462
// NewTopicProvisioner returns a new TopicProvisioner.
@@ -65,9 +73,15 @@ func NewTopicProvisioner(config TopicProvisionerConfig) (TopicProvisioner, error
6573
return nil, errors.New("meter is required")
6674
}
6775

76+
protectedTopics := make(map[string]struct{}, len(config.ProtectedTopics))
77+
for _, protectedTopic := range config.ProtectedTopics {
78+
protectedTopics[protectedTopic] = struct{}{}
79+
}
80+
6881
provisioner := &topicProvisioner{
69-
client: config.AdminClient,
70-
logger: config.Logger,
82+
client: config.AdminClient,
83+
logger: config.Logger,
84+
protectedTopics: protectedTopics,
7185
}
7286

7387
provisioner.cache = expirable.NewLRU[string, struct{}](config.CacheSize, provisioner.evictCallback, config.CacheTTL)
@@ -128,10 +142,12 @@ var _ TopicProvisioner = (*topicProvisioner)(nil)
128142
// to keep track of topics which have been already provisioned. This allows the provisioner being called multiple times with the same
129143
// set of topics without need for extra round-trip to Kafka brokers for sub-sequential calls.
130144
type topicProvisioner struct {
131-
client *kafka.AdminClient
145+
client AdminClient
132146
logger *slog.Logger
133147
cache *expirable.LRU[string, struct{}]
134148

149+
protectedTopics map[string]struct{}
150+
135151
metrics struct {
136152
// Errors
137153
Errors metric.Int64Counter
@@ -233,13 +249,25 @@ func (p *topicProvisioner) DeProvision(ctx context.Context, topics ...string) er
233249
return nil
234250
}
235251

252+
topicsToDelete := make([]string, 0, len(topics))
236253
for _, topic := range topics {
237254
if topic == "" {
238-
return fmt.Errorf("invalid topic name %q", topic)
255+
p.logger.Warn("skip topic: empty topic name", "topic", topic)
256+
257+
continue
258+
}
259+
260+
// Skip protected topics to avoid accidental deletion
261+
if _, ok := p.protectedTopics[topic]; ok {
262+
p.logger.Info("skip topic: protected topic", "topic", topic)
263+
264+
continue
239265
}
266+
267+
topicsToDelete = append(topicsToDelete, topic)
240268
}
241269

242-
results, err := p.client.DeleteTopics(ctx, topics)
270+
results, err := p.client.DeleteTopics(ctx, topicsToDelete)
243271
if err != nil {
244272
p.metrics.Errors.Add(ctx, 1, metric.WithAttributes(attribute.String("scope", "client")))
245273

pkg/kafka/topicprovisioner_test.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"testing"
7+
"time"
8+
9+
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/otel/metric/noop"
13+
)
14+
15+
// FIXME(chrisgacsal): move discardHandler to 'testutils' pkg after import cycle is resolved.
16+
// discardHandler is a slog.Handler implementation which does not emit log messages
17+
// See: https://go-review.googlesource.com/c/go/+/548335/5/src/log/slog/example_discard_test.go#14
18+
type discardHandler struct {
19+
slog.JSONHandler
20+
}
21+
22+
func (d *discardHandler) Enabled(context.Context, slog.Level) bool { return false }
23+
24+
func NewDiscardLogger(t testing.TB) *slog.Logger {
25+
t.Helper()
26+
27+
return slog.New(&discardHandler{})
28+
}
29+
30+
var _ AdminClient = (*mockTopicProvisioner)(nil)
31+
32+
type mockTopicProvisioner struct {
33+
added []string
34+
removed []string
35+
}
36+
37+
func (m *mockTopicProvisioner) CreateTopics(_ context.Context, topics []kafka.TopicSpecification, _ ...kafka.CreateTopicsAdminOption) ([]kafka.TopicResult, error) {
38+
result := make([]kafka.TopicResult, 0, len(topics))
39+
40+
for _, topic := range topics {
41+
m.added = append(m.added, topic.Topic)
42+
43+
result = append(result, kafka.TopicResult{
44+
Topic: topic.Topic,
45+
Error: kafka.NewError(kafka.ErrNoError, "", false),
46+
})
47+
}
48+
49+
return result, nil
50+
}
51+
52+
func (m *mockTopicProvisioner) DeleteTopics(_ context.Context, topics []string, _ ...kafka.DeleteTopicsAdminOption) ([]kafka.TopicResult, error) {
53+
result := make([]kafka.TopicResult, 0, len(topics))
54+
55+
for _, topic := range topics {
56+
m.removed = append(m.removed, topic)
57+
58+
result = append(result, kafka.TopicResult{
59+
Topic: topic,
60+
Error: kafka.NewError(kafka.ErrNoError, "", false),
61+
})
62+
}
63+
64+
return result, nil
65+
}
66+
67+
func (m *mockTopicProvisioner) reset() {
68+
m.added, m.removed = []string{}, []string{}
69+
}
70+
71+
func TestTopicProvisioner(t *testing.T) {
72+
tests := []struct {
73+
Name string
74+
75+
AddTopics []TopicConfig
76+
RemoveTopics []string
77+
78+
ExpectedError error
79+
ExpectedAddedTopics []string
80+
ExpectedRemovedTopics []string
81+
}{
82+
{
83+
Name: "Add topics",
84+
AddTopics: []TopicConfig{
85+
{
86+
Name: "topic-1",
87+
Partitions: 1,
88+
},
89+
90+
{
91+
Name: "topic-2",
92+
Partitions: 1,
93+
},
94+
},
95+
ExpectedError: nil,
96+
ExpectedAddedTopics: []string{"topic-1", "topic-2"},
97+
ExpectedRemovedTopics: []string{},
98+
},
99+
{
100+
Name: "Remove topics",
101+
RemoveTopics: []string{"topic-1", "topic-2"},
102+
ExpectedError: nil,
103+
ExpectedAddedTopics: []string{},
104+
ExpectedRemovedTopics: []string{"topic-1", "topic-2"},
105+
},
106+
{
107+
Name: "Remove protected topics",
108+
RemoveTopics: []string{"protected-topic-1", "protected-topic-2"},
109+
ExpectedError: nil,
110+
ExpectedAddedTopics: []string{},
111+
ExpectedRemovedTopics: []string{},
112+
},
113+
}
114+
115+
adminClient := &mockTopicProvisioner{}
116+
meter := noop.NewMeterProvider().Meter("test")
117+
logger := NewDiscardLogger(t)
118+
119+
provisioner, err := NewTopicProvisioner(TopicProvisionerConfig{
120+
AdminClient: adminClient,
121+
Logger: logger,
122+
Meter: meter,
123+
CacheSize: 200,
124+
CacheTTL: 5 * time.Second,
125+
ProtectedTopics: []string{
126+
"protected-topic-1",
127+
"protected-topic-2",
128+
},
129+
})
130+
require.NoError(t, err, "initializing new topic provisioner should not fail")
131+
132+
for _, test := range tests {
133+
t.Run(test.Name, func(t *testing.T) {
134+
adminClient.reset()
135+
136+
ctx := context.TODO()
137+
138+
err = provisioner.Provision(ctx, test.AddTopics...)
139+
assert.NoError(t, err, "provisioning topics must not fail")
140+
141+
assert.ElementsMatch(t, test.ExpectedAddedTopics, adminClient.added, "provisioned topics must match")
142+
143+
err = provisioner.DeProvision(ctx, test.RemoveTopics...)
144+
assert.NoError(t, err, "de-provisioning topics must not fail")
145+
146+
assert.ElementsMatch(t, test.ExpectedRemovedTopics, adminClient.removed, "de-provisioned topics must match")
147+
})
148+
}
149+
}

0 commit comments

Comments
 (0)