-
Notifications
You must be signed in to change notification settings - Fork 0
/
configuration.go
125 lines (115 loc) · 4.28 KB
/
configuration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package kafkauniverse
import (
"errors"
"time"
)
// KafkaClusterRepresentation struct
type KafkaClusterRepresentation struct {
ID *string `mapstructure:"id"`
Enabled *bool `mapstructure:"enabled"`
Version *string `mapstructure:"version"`
TLSEnabled *bool `mapstructure:"tls-enabled"`
SaramaLogEnabled *bool `mapstructure:"sarama-log-enabled"`
Brokers []string `mapstructure:"brokers"`
Security *KafkaSecurityRepresentation `mapstructure:"security"`
Producers []KafkaProducerRepresentation `mapstructure:"producers"`
Consumers []KafkaConsumerRepresentation `mapstructure:"consumers"`
}
// KafkaSecurityRepresentation struct
type KafkaSecurityRepresentation struct {
ClientID *string `mapstructure:"client-id"`
ClientSecret *string `mapstructure:"client-secret"`
TokenURL *string `mapstructure:"token-url"`
}
// KafkaProducerRepresentation struct
type KafkaProducerRepresentation struct {
ID *string `mapstructure:"id"`
Enabled *bool `mapstructure:"enabled"`
Topic *string `mapstructure:"topic"`
}
// KafkaConsumerRepresentation struct
type KafkaConsumerRepresentation struct {
ID *string `mapstructure:"id"`
Enabled *bool `mapstructure:"enabled"`
Topic *string `mapstructure:"topic"`
ConsumerGroupName *string `mapstructure:"consumer-group-name"`
FailureProducer *string `mapstructure:"failure-producer"`
ConsumptionDelay *time.Duration `mapstructure:"consumption-delay"`
}
// Validate validates a KafkaClusterRepresentation instance
func (kcr *KafkaClusterRepresentation) Validate() error {
var err error
if kcr.ID == nil || *kcr.ID == "" {
return errors.New("cluster ID should be set and not empty")
}
if kcr.Version == nil || *kcr.Version == "" {
return errors.New("cluster Version should be set and not empty")
}
if len(kcr.Brokers) == 0 {
return errors.New("cluster brokers should contain at least one broker")
}
for _, broker := range kcr.Brokers {
if broker == "" {
return errors.New("cluster broker value should not be empty")
}
}
if kcr.Security == nil {
return errors.New("cluster is missing security configuration")
}
if err = kcr.Security.Validate(); err != nil {
return err
}
if len(kcr.Producers)+len(kcr.Consumers) == 0 {
return errors.New("do you really need to configure a cluster with neither producer nor consumer?")
}
for _, producer := range kcr.Producers {
if err = producer.Validate(); err != nil {
return err
}
}
for _, consumer := range kcr.Consumers {
if err = consumer.Validate(); err != nil {
return err
}
}
return nil
}
// Validate validates a KafkaSecurityRepresentation instance
func (ksr *KafkaSecurityRepresentation) Validate() error {
if ksr.ClientID == nil || *ksr.ClientID == "" {
return errors.New("client-id is mandatory and should not be empty")
}
if ksr.ClientSecret == nil || *ksr.ClientSecret == "" {
return errors.New("client-secret is mandatory and should not be empty")
}
if ksr.TokenURL == nil || *ksr.TokenURL == "" {
return errors.New("token-url is mandatory and should not be empty")
}
return nil
}
// Validate validates a KafkaProducerRepresentation instance
func (kpr *KafkaProducerRepresentation) Validate() error {
if kpr.ID == nil || *kpr.ID == "" {
return errors.New("producer id is mandatory and should not be empty")
}
if kpr.Topic == nil || *kpr.Topic == "" {
return errors.New("producer topic is mandatory and should not be empty")
}
return nil
}
// Validate validates a KafkaConsumerRepresentation instance
func (kcr *KafkaConsumerRepresentation) Validate() error {
if kcr.ID == nil || *kcr.ID == "" {
return errors.New("consumer id is mandatory and should not be empty")
}
if kcr.Topic == nil || *kcr.Topic == "" {
return errors.New("consumer topic is mandatory and should not be empty")
}
if kcr.ConsumerGroupName == nil || *kcr.ConsumerGroupName == "" {
return errors.New("consumer group name is mandatory and should not be empty")
}
if kcr.FailureProducer != nil && *kcr.FailureProducer == "" {
return errors.New("consumer failure producer is optional but should not be empty")
}
return nil
}