@@ -3,44 +3,93 @@ package kafkacontroller
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "github.com/exoscale/egoscale/v2/oapi"
6
7
"strings"
7
8
9
+ exoscalesdk "github.com/exoscale/egoscale/v2"
8
10
exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1"
11
+ "github.com/vshn/provider-exoscale/operator/pipelineutil"
9
12
"github.com/vshn/provider-exoscale/operator/webhook"
10
13
ctrl "sigs.k8s.io/controller-runtime"
14
+ "sigs.k8s.io/controller-runtime/pkg/client"
11
15
12
16
"github.com/go-logr/logr"
13
17
"k8s.io/apimachinery/pkg/runtime"
14
18
)
15
19
16
- var admittedVersions = [] string { "3.2" }
20
+ const serviceType = "kafka"
17
21
18
22
// SetupWebhook adds a webhook for kafka resources.
19
23
func SetupWebhook (mgr ctrl.Manager ) error {
20
24
return ctrl .NewWebhookManagedBy (mgr ).
21
25
For (& exoscalev1.Kafka {}).
22
26
WithValidator (& Validator {
23
- log : mgr .GetLogger ().WithName ("webhook" ).WithName (strings .ToLower (exoscalev1 .KafkaKind )),
27
+ log : mgr .GetLogger ().WithName ("webhook" ).WithName (strings .ToLower (exoscalev1 .KafkaKind )),
28
+ kube : mgr .GetClient (),
24
29
}).
25
30
Complete ()
26
31
}
27
32
28
33
// Validator validates kafka admission requests.
29
34
type Validator struct {
30
- log logr.Logger
35
+ log logr.Logger
36
+ kube client.Client
31
37
}
32
38
33
39
// ValidateCreate validates the spec of a created kafka resource.
34
- func (v * Validator ) ValidateCreate (_ context.Context , obj runtime.Object ) error {
40
+ func (v * Validator ) ValidateCreate (ctx context.Context , obj runtime.Object ) error {
41
+ instance := obj .(* exoscalev1.Kafka )
42
+ v .log .V (1 ).Info ("get kafka available versions" )
43
+ exo , err := pipelineutil .OpenExoscaleClient (ctx , v .kube , instance .GetProviderConfigName (), exoscalesdk .ClientOptWithAPIEndpoint (fmt .Sprintf ("https://api-%s.exoscale.com" , instance .Spec .ForProvider .Zone )))
44
+ if err != nil {
45
+ return fmt .Errorf ("open exoscale client failed: %w" , err )
46
+ }
47
+ return v .validateCreateWithExoClient (ctx , obj , exo .Exoscale )
48
+ }
49
+
50
+ func (v * Validator ) validateCreateWithExoClient (ctx context.Context , obj runtime.Object , exo oapi.ClientWithResponsesInterface ) error {
35
51
instance , ok := obj .(* exoscalev1.Kafka )
36
52
if ! ok {
37
53
return fmt .Errorf ("invalid managed resource type %T for kafka webhook" , obj )
38
54
}
39
55
v .log .V (2 ).WithValues ("instance" , instance ).Info ("validate create" )
40
56
57
+ availableVersions , err := v .getAvailableVersions (ctx , exo )
58
+ if err != nil {
59
+ return err
60
+ }
61
+
62
+ err = v .validateVersion (ctx , obj , * availableVersions )
63
+ if err != nil {
64
+ return err
65
+ }
66
+
41
67
return validateSpec (instance .Spec .ForProvider )
42
68
}
43
69
70
+ func (v * Validator ) getAvailableVersions (ctx context.Context , exo oapi.ClientWithResponsesInterface ) (* []string , error ) {
71
+ // get kafka available versions
72
+ resp , err := exo .GetDbaasServiceTypeWithResponse (ctx , serviceType )
73
+ if err != nil {
74
+ return nil , fmt .Errorf ("get DBaaS service type failed: %w" , err )
75
+ }
76
+
77
+ v .log .V (1 ).Info ("DBaaS service type" , "body" , string (resp .Body ))
78
+
79
+ serviceType := * resp .JSON200
80
+ if serviceType .AvailableVersions == nil {
81
+ return nil , fmt .Errorf ("kafka available versions not found" )
82
+ }
83
+ return serviceType .AvailableVersions , nil
84
+ }
85
+
86
+ func (v * Validator ) validateVersion (_ context.Context , obj runtime.Object , availableVersions []string ) error {
87
+ instance := obj .(* exoscalev1.Kafka )
88
+
89
+ v .log .V (1 ).Info ("validate version" )
90
+ return webhook .ValidateVersions (instance .Spec .ForProvider .Version , availableVersions )
91
+ }
92
+
44
93
// ValidateUpdate validates the spec of an updated kafka resource and checks that no immutable field has been modified.
45
94
func (v * Validator ) ValidateUpdate (_ context.Context , oldObj , newObj runtime.Object ) error {
46
95
newInstance , ok := newObj .(* exoscalev1.Kafka )
@@ -67,11 +116,7 @@ func (v *Validator) ValidateDelete(_ context.Context, obj runtime.Object) error
67
116
}
68
117
69
118
func validateSpec (params exoscalev1.KafkaParameters ) error {
70
- err := validateVersion (params )
71
- if err != nil {
72
- return err
73
- }
74
- err = validateIpFilter (params )
119
+ err := validateIpFilter (params )
75
120
if err != nil {
76
121
return err
77
122
}
@@ -82,10 +127,6 @@ func validateSpec(params exoscalev1.KafkaParameters) error {
82
127
return validateKafkaSettings (params )
83
128
}
84
129
85
- func validateVersion (obj exoscalev1.KafkaParameters ) error {
86
- return webhook .ValidateVersions (obj .Version , admittedVersions )
87
- }
88
-
89
130
func validateIpFilter (params exoscalev1.KafkaParameters ) error {
90
131
if len (params .IPFilter ) == 0 {
91
132
return fmt .Errorf ("IP filter cannot be empty" )
@@ -121,10 +162,6 @@ func compareVersion(oldInst, newInst exoscalev1.Kafka) error {
121
162
if oldInst .Spec .ForProvider .Version == newInst .Spec .ForProvider .Version {
122
163
return nil
123
164
}
124
- if newInst .Spec .ForProvider .Version == "" {
125
- // Setting version to empty string should always be fine
126
- return nil
127
- }
128
165
if oldInst .Spec .ForProvider .Version == "" {
129
166
// Fall back to reported version if no version was set before
130
167
oldInst .Spec .ForProvider .Version = oldInst .Status .AtProvider .Version
0 commit comments