@@ -3,42 +3,93 @@ package kafkacontroller
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "github.com/exoscale/egoscale/v2/oapi"
6
7
"strings"
7
8
9
+ exoscalesdk "github.com/exoscale/egoscale/v2"
8
10
exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1"
11
+ "github.com/vshn/provider-exoscale/operator/pipelineutil"
9
12
"github.com/vshn/provider-exoscale/operator/webhook"
10
13
ctrl "sigs.k8s.io/controller-runtime"
14
+ "sigs.k8s.io/controller-runtime/pkg/client"
11
15
12
16
"github.com/go-logr/logr"
13
17
"k8s.io/apimachinery/pkg/runtime"
14
18
)
15
19
20
+ const serviceType = "kafka"
21
+
16
22
// SetupWebhook adds a webhook for kafka resources.
17
23
func SetupWebhook (mgr ctrl.Manager ) error {
18
24
return ctrl .NewWebhookManagedBy (mgr ).
19
25
For (& exoscalev1.Kafka {}).
20
26
WithValidator (& Validator {
21
- log : mgr .GetLogger ().WithName ("webhook" ).WithName (strings .ToLower (exoscalev1 .KafkaKind )),
27
+ log : mgr .GetLogger ().WithName ("webhook" ).WithName (strings .ToLower (exoscalev1 .KafkaKind )),
28
+ kube : mgr .GetClient (),
22
29
}).
23
30
Complete ()
24
31
}
25
32
26
33
// Validator validates kafka admission requests.
27
34
type Validator struct {
28
- log logr.Logger
35
+ log logr.Logger
36
+ kube client.Client
29
37
}
30
38
31
39
// ValidateCreate validates the spec of a created kafka resource.
32
- func (v * Validator ) ValidateCreate (_ context.Context , obj runtime.Object ) error {
40
+ func (v * Validator ) ValidateCreate (ctx context.Context , obj runtime.Object ) error {
41
+ instance := obj .(* exoscalev1.Kafka )
42
+ v .log .V (1 ).Info ("get kafka available versions" )
43
+ exo , err := pipelineutil .OpenExoscaleClient (ctx , v .kube , instance .GetProviderConfigName (), exoscalesdk .ClientOptWithAPIEndpoint (fmt .Sprintf ("https://api-%s.exoscale.com" , instance .Spec .ForProvider .Zone )))
44
+ if err != nil {
45
+ return fmt .Errorf ("open exoscale client failed: %w" , err )
46
+ }
47
+ return v .validateCreateWithExoClient (ctx , obj , exo .Exoscale )
48
+ }
49
+
50
+ func (v * Validator ) validateCreateWithExoClient (ctx context.Context , obj runtime.Object , exo oapi.ClientWithResponsesInterface ) error {
33
51
instance , ok := obj .(* exoscalev1.Kafka )
34
52
if ! ok {
35
53
return fmt .Errorf ("invalid managed resource type %T for kafka webhook" , obj )
36
54
}
37
55
v .log .V (2 ).WithValues ("instance" , instance ).Info ("validate create" )
38
56
57
+ availableVersions , err := v .getAvailableVersions (ctx , exo )
58
+ if err != nil {
59
+ return err
60
+ }
61
+
62
+ err = v .validateVersion (ctx , obj , * availableVersions )
63
+ if err != nil {
64
+ return err
65
+ }
66
+
39
67
return validateSpec (instance .Spec .ForProvider )
40
68
}
41
69
70
+ func (v * Validator ) getAvailableVersions (ctx context.Context , exo oapi.ClientWithResponsesInterface ) (* []string , error ) {
71
+ // get kafka available versions
72
+ resp , err := exo .GetDbaasServiceTypeWithResponse (ctx , serviceType )
73
+ if err != nil {
74
+ return nil , fmt .Errorf ("get DBaaS service type failed: %w" , err )
75
+ }
76
+
77
+ v .log .V (1 ).Info ("DBaaS service type" , "body" , string (resp .Body ))
78
+
79
+ serviceType := * resp .JSON200
80
+ if serviceType .AvailableVersions == nil {
81
+ return nil , fmt .Errorf ("kafka available versions not found" )
82
+ }
83
+ return serviceType .AvailableVersions , nil
84
+ }
85
+
86
+ func (v * Validator ) validateVersion (_ context.Context , obj runtime.Object , availableVersions []string ) error {
87
+ instance := obj .(* exoscalev1.Kafka )
88
+
89
+ v .log .V (1 ).Info ("validate version" )
90
+ return webhook .ValidateVersions (instance .Spec .ForProvider .Version , availableVersions )
91
+ }
92
+
42
93
// ValidateUpdate validates the spec of an updated kafka resource and checks that no immutable field has been modified.
43
94
func (v * Validator ) ValidateUpdate (_ context.Context , oldObj , newObj runtime.Object ) error {
44
95
newInstance , ok := newObj .(* exoscalev1.Kafka )
@@ -47,7 +98,7 @@ func (v *Validator) ValidateUpdate(_ context.Context, oldObj, newObj runtime.Obj
47
98
}
48
99
oldInstance , ok := oldObj .(* exoscalev1.Kafka )
49
100
if ! ok {
50
- return fmt .Errorf ("invalid managed resource type %T for kafka webhook" , newObj )
101
+ return fmt .Errorf ("invalid managed resource type %T for kafka webhook" , oldObj )
51
102
}
52
103
v .log .V (2 ).WithValues ("old" , oldInstance , "new" , newInstance ).Info ("VALIDATE update" )
53
104
@@ -111,13 +162,9 @@ func compareVersion(oldInst, newInst exoscalev1.Kafka) error {
111
162
if oldInst .Spec .ForProvider .Version == newInst .Spec .ForProvider .Version {
112
163
return nil
113
164
}
114
- if newInst .Spec .ForProvider .Version == "" {
115
- // Setting version to empyt string should always be fine
116
- return nil
117
- }
118
165
if oldInst .Spec .ForProvider .Version == "" {
119
166
// Fall back to reported version if no version was set before
120
167
oldInst .Spec .ForProvider .Version = oldInst .Status .AtProvider .Version
121
168
}
122
- return webhook .ValidateVersion (oldInst .Status .AtProvider .Version , oldInst .Spec .ForProvider .Version , newInst .Spec .ForProvider .Version )
169
+ return webhook .ValidateUpdateVersion (oldInst .Status .AtProvider .Version , oldInst .Spec .ForProvider .Version , newInst .Spec .ForProvider .Version )
123
170
}
0 commit comments