Skip to content

Commit 3dd445d

Browse files
committed
Add KafkaSink E2E tests
Signed-off-by: Pierangelo Di Pilato <[email protected]>
1 parent d3cbc35 commit 3dd445d

File tree

7 files changed

+172
-139
lines changed

7 files changed

+172
-139
lines changed

control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_defaults.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (ks *KafkaSink) SetDefaults(ctx context.Context) {
2727
func (kss *KafkaSinkSpec) SetDefaults(ctx context.Context) {
2828
defaultMode := ModeStructured
2929

30-
if kss.ContentMode == nil {
30+
if kss.ContentMode == nil || *kss.ContentMode == "" {
3131
kss.ContentMode = &defaultMode
3232
}
3333
}

control-plane/pkg/reconciler/sink/kafka_sink_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func sinkReconciliation(t *testing.T, format string, configs broker.Configs) {
151151
Objects: []runtime.Object{
152152
NewSink(
153153
SinkControllerOwnsTopic,
154-
SinkAuthSecretRef(NewSSLSecret(SinkNamespace, "secret-1")),
154+
SinkAuthSecretRef("secret-1"),
155155
),
156156
NewSSLSecret(SinkNamespace, "secret-1"),
157157
NewConfigMap(&configs, nil),
@@ -196,7 +196,7 @@ func sinkReconciliation(t *testing.T, format string, configs broker.Configs) {
196196
{
197197
Object: NewSink(
198198
SinkControllerOwnsTopic,
199-
SinkAuthSecretRef(NewSSLSecret(SinkNamespace, "secret-1")),
199+
SinkAuthSecretRef("secret-1"),
200200
InitSinkConditions,
201201
SinkDataPlaneAvailable,
202202
SinkConfigParsed,

control-plane/pkg/reconciler/testing/objects_sink.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,12 @@ func SinkControllerDontOwnTopic(sink *eventing.KafkaSink) {
177177
sink.GetStatus().Annotations[base.TopicOwnerAnnotation] = sinkreconciler.ExternalTopicOwner
178178
}
179179

180-
func SinkAuthSecretRef(s *corev1.Secret) func(sink *eventing.KafkaSink) {
180+
func SinkAuthSecretRef(name string) func(sink *eventing.KafkaSink) {
181181
return func(sink *eventing.KafkaSink) {
182182
sink.Spec.Auth = &eventing.Auth{
183183
Secret: &eventing.Secret{
184184
Ref: &eventing.SecretReference{
185-
Name: s.Name,
185+
Name: name,
186186
},
187187
},
188188
}

test/e2e/broker_sasl_ssl_test.go

Lines changed: 11 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,12 @@ import (
3838
duckv1 "knative.dev/pkg/apis/duck/v1"
3939

4040
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka"
41-
testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing"
41+
. "knative.dev/eventing-kafka-broker/test/pkg/testing"
4242
)
4343

44-
const (
45-
kafkaNamespace = "kafka"
46-
tlsUserSecretName = "my-tls-user"
47-
saslUserSecretName = "my-sasl-user"
48-
caSecretName = "my-cluster-cluster-ca-cert"
49-
)
50-
51-
type SecretProvider func(name string, client *testlib.Client) map[string][]byte
52-
53-
type ConfigProvider func(secretName string, client *testlib.Client) map[string]string
54-
5544
func brokerAuth(t *testing.T, secretProvider SecretProvider, configProvider ConfigProvider) {
5645

57-
testingpkg.RunMultiple(t, func(t *testing.T) {
46+
RunMultiple(t, func(t *testing.T) {
5847

5948
ctx := context.Background()
6049

@@ -97,7 +86,7 @@ func brokerAuth(t *testing.T, secretProvider SecretProvider, configProvider Conf
9786
assert.Nil(t, err)
9887
assert.False(t, br.Status.IsReady(), "secret %s/%s doesn't exist, so broker must no be ready", client.Namespace, secretName)
9988

100-
secretData := secretProvider(secretName, client)
89+
secretData := secretProvider(t, client)
10190

10291
secret := &corev1.Secret{
10392
ObjectMeta: metav1.ObjectMeta{
@@ -169,16 +158,12 @@ func TestBrokerAuthPlaintext(t *testing.T) {
169158

170159
brokerAuth(
171160
t,
172-
func(name string, client *testlib.Client) map[string][]byte {
173-
return map[string][]byte{
174-
"protocol": []byte("PLAINTEXT"),
175-
}
176-
},
161+
Plaintext,
177162
func(secretName string, client *testlib.Client) map[string]string {
178163
return map[string]string{
179164
"default.topic.replication.factor": "2",
180165
"default.topic.partitions": "2",
181-
"bootstrap.servers": testingpkg.BootstrapServersPlaintext,
166+
"bootstrap.servers": BootstrapServersPlaintext,
182167
"auth.secret.ref.name": secretName,
183168
}
184169
},
@@ -189,25 +174,12 @@ func TestBrokerAuthSsl(t *testing.T) {
189174

190175
brokerAuth(
191176
t,
192-
func(name string, client *testlib.Client) map[string][]byte {
193-
caSecret, err := client.Kube.CoreV1().Secrets(kafkaNamespace).Get(context.Background(), caSecretName, metav1.GetOptions{})
194-
assert.Nil(t, err)
195-
196-
tlsUserSecret, err := client.Kube.CoreV1().Secrets(kafkaNamespace).Get(context.Background(), tlsUserSecretName, metav1.GetOptions{})
197-
assert.Nil(t, err)
198-
199-
return map[string][]byte{
200-
"protocol": []byte("SSL"),
201-
"ca.crt": caSecret.Data["ca.crt"],
202-
"user.crt": tlsUserSecret.Data["user.crt"],
203-
"user.key": tlsUserSecret.Data["user.key"],
204-
}
205-
},
177+
Ssl,
206178
func(secretName string, client *testlib.Client) map[string]string {
207179
return map[string]string{
208180
"default.topic.replication.factor": "2",
209181
"default.topic.partitions": "2",
210-
"bootstrap.servers": testingpkg.BootstrapServersSsl,
182+
"bootstrap.servers": BootstrapServersSsl,
211183
"auth.secret.ref.name": secretName,
212184
}
213185
},
@@ -218,23 +190,12 @@ func TestBrokerAuthSaslPlaintextScram512(t *testing.T) {
218190

219191
brokerAuth(
220192
t,
221-
func(name string, client *testlib.Client) map[string][]byte {
222-
223-
saslUserSecret, err := client.Kube.CoreV1().Secrets(kafkaNamespace).Get(context.Background(), saslUserSecretName, metav1.GetOptions{})
224-
assert.Nil(t, err)
225-
226-
return map[string][]byte{
227-
"protocol": []byte("SASL_PLAINTEXT"),
228-
"sasl.mechanism": []byte("SCRAM-SHA-512"),
229-
"user": []byte(saslUserSecretName),
230-
"password": saslUserSecret.Data["password"],
231-
}
232-
},
193+
SaslPlaintextScram512,
233194
func(secretName string, client *testlib.Client) map[string]string {
234195
return map[string]string{
235196
"default.topic.replication.factor": "2",
236197
"default.topic.partitions": "2",
237-
"bootstrap.servers": testingpkg.BootstrapServersSaslPlaintext,
198+
"bootstrap.servers": BootstrapServersSaslPlaintext,
238199
"auth.secret.ref.name": secretName,
239200
}
240201
},
@@ -245,26 +206,12 @@ func TestBrokerAuthSslSaslScram512(t *testing.T) {
245206

246207
brokerAuth(
247208
t,
248-
func(name string, client *testlib.Client) map[string][]byte {
249-
caSecret, err := client.Kube.CoreV1().Secrets(kafkaNamespace).Get(context.Background(), caSecretName, metav1.GetOptions{})
250-
assert.Nil(t, err)
251-
252-
saslUserSecret, err := client.Kube.CoreV1().Secrets(kafkaNamespace).Get(context.Background(), saslUserSecretName, metav1.GetOptions{})
253-
assert.Nil(t, err)
254-
255-
return map[string][]byte{
256-
"protocol": []byte("SASL_SSL"),
257-
"sasl.mechanism": []byte("SCRAM-SHA-512"),
258-
"ca.crt": caSecret.Data["ca.crt"],
259-
"user": []byte(saslUserSecretName),
260-
"password": saslUserSecret.Data["password"],
261-
}
262-
},
209+
SslSaslScram512,
263210
func(secretName string, client *testlib.Client) map[string]string {
264211
return map[string]string{
265212
"default.topic.replication.factor": "2",
266213
"default.topic.partitions": "2",
267-
"bootstrap.servers": testingpkg.BootstrapServersSslSaslScram,
214+
"bootstrap.servers": BootstrapServersSslSaslScram,
268215
"auth.secret.ref.name": secretName,
269216
}
270217
},

test/e2e/kafka_sink_test.go

Lines changed: 71 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
package e2e
2020

2121
import (
22+
"context"
2223
"testing"
2324

24-
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
26+
corev1 "k8s.io/api/core/v1"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2528
"k8s.io/apimachinery/pkg/types"
2629
"k8s.io/utils/pointer"
2730
testlib "knative.dev/eventing/test/lib"
@@ -30,99 +33,79 @@ import (
3033
eventingv1alpha1clientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned/typed/eventing/v1alpha1"
3134
"knative.dev/eventing-kafka-broker/test/pkg/addressable"
3235
"knative.dev/eventing-kafka-broker/test/pkg/sink"
33-
testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing"
36+
. "knative.dev/eventing-kafka-broker/test/pkg/testing"
3437
)
3538

36-
func TestKafkaSinkV1Alpha1DefaultContentMode(t *testing.T) {
37-
testingpkg.RunMultiple(t, func(t *testing.T) {
38-
39-
client := testlib.Setup(t, false)
40-
defer testlib.TearDown(client)
41-
42-
clientSet, err := eventingv1alpha1clientset.NewForConfig(client.Config)
43-
assert.Nil(t, err)
44-
45-
// Create a KafkaSink with the following spec.
46-
47-
kss := eventingv1alpha1.KafkaSinkSpec{
48-
Topic: "kafka-sink-" + client.Namespace,
49-
NumPartitions: pointer.Int32Ptr(10),
50-
ReplicationFactor: func(rf int16) *int16 { return &rf }(1),
51-
BootstrapServers: testingpkg.BootstrapServersPlaintextArr,
52-
}
53-
54-
createFunc := sink.CreatorV1Alpha1(clientSet, kss)
55-
56-
kafkaSink, err := createFunc(types.NamespacedName{
57-
Namespace: client.Namespace,
58-
Name: "kafka-sink",
59-
})
60-
assert.Nil(t, err)
61-
62-
client.WaitForResourceReadyOrFail(kafkaSink.Name, &kafkaSink.TypeMeta)
63-
64-
// Send events to the KafkaSink.
65-
ids := addressable.Send(t, kafkaSink)
39+
const (
40+
sinkSecretName = "secret-test"
41+
)
6642

67-
// Read events from the topic.
68-
sink.Verify(t, client, eventingv1alpha1.ModeStructured, kss.Topic, ids)
43+
func TestKafkaSinkV1Alpha1DefaultContentMode(t *testing.T) {
44+
testKafkaSink(t, eventingv1alpha1.ModeStructured, nil, func(kss *eventingv1alpha1.KafkaSinkSpec) error {
45+
kss.ContentMode = pointer.StringPtr("")
46+
return nil
6947
})
7048
}
7149

7250
func TestKafkaSinkV1Alpha1StructuredContentMode(t *testing.T) {
73-
testingpkg.RunMultiple(t, func(t *testing.T) {
74-
75-
client := testlib.Setup(t, false)
76-
defer testlib.TearDown(client)
77-
78-
clientSet, err := eventingv1alpha1clientset.NewForConfig(client.Config)
79-
assert.Nil(t, err)
80-
81-
// Create a KafkaSink with the following spec.
82-
83-
kss := eventingv1alpha1.KafkaSinkSpec{
84-
Topic: "kafka-sink-" + client.Namespace,
85-
NumPartitions: pointer.Int32Ptr(10),
86-
ReplicationFactor: func(rf int16) *int16 { return &rf }(1),
87-
BootstrapServers: testingpkg.BootstrapServersPlaintextArr,
88-
ContentMode: pointer.StringPtr(eventingv1alpha1.ModeStructured),
89-
}
51+
testKafkaSink(t, eventingv1alpha1.ModeStructured, nil)
52+
}
9053

91-
createFunc := sink.CreatorV1Alpha1(clientSet, kss)
54+
func TestKafkaSinkV1Alpha1BinaryContentMode(t *testing.T) {
55+
testKafkaSink(t, eventingv1alpha1.ModeBinary, nil)
56+
}
9257

93-
kafkaSink, err := createFunc(types.NamespacedName{
94-
Namespace: client.Namespace,
95-
Name: "kafka-sink",
96-
})
97-
assert.Nil(t, err)
58+
func TestKafkaSinkV1Alpha1AuthPlaintext(t *testing.T) {
59+
testKafkaSink(t, eventingv1alpha1.ModeStructured, Plaintext, withBootstrap(BootstrapServersPlaintextArr), withSecret)
60+
}
9861

99-
client.WaitForResourceReadyOrFail(kafkaSink.Name, &kafkaSink.TypeMeta)
62+
func TestKafkaSinkV1Alpha1AuthSsl(t *testing.T) {
63+
testKafkaSink(t, eventingv1alpha1.ModeStructured, Ssl, withBootstrap(BootstrapServersSslArr), withSecret)
64+
}
10065

101-
// Send events to the KafkaSink.
102-
ids := addressable.Send(t, kafkaSink)
66+
func TestKafkaSinkV1Alpha1AuthSaslPlaintextScram512(t *testing.T) {
67+
testKafkaSink(t, eventingv1alpha1.ModeStructured, SaslPlaintextScram512, withBootstrap(BootstrapServersSaslPlaintextArr), withSecret)
68+
}
10369

104-
// Read events from the topic.
105-
sink.Verify(t, client, eventingv1alpha1.ModeStructured, kss.Topic, ids)
106-
})
70+
func TestKafkaSinkV1Alpha1AuthSslSaslScram512(t *testing.T) {
71+
testKafkaSink(t, eventingv1alpha1.ModeStructured, SslSaslScram512, withBootstrap(BootstrapServersSslSaslScramArr), withSecret)
10772
}
10873

109-
func TestKafkaSinkV1Alpha1BinaryContentMode(t *testing.T) {
110-
testingpkg.RunMultiple(t, func(t *testing.T) {
74+
func testKafkaSink(t *testing.T, mode string, sp SecretProvider, opts ...func(kss *eventingv1alpha1.KafkaSinkSpec) error) {
75+
RunMultiple(t, func(t *testing.T) {
76+
77+
ctx := context.Background()
11178

11279
client := testlib.Setup(t, false)
11380
defer testlib.TearDown(client)
11481

11582
clientSet, err := eventingv1alpha1clientset.NewForConfig(client.Config)
116-
assert.Nil(t, err)
83+
require.Nil(t, err)
11784

11885
// Create a KafkaSink with the following spec.
11986

12087
kss := eventingv1alpha1.KafkaSinkSpec{
12188
Topic: "kafka-sink-" + client.Namespace,
12289
NumPartitions: pointer.Int32Ptr(10),
12390
ReplicationFactor: func(rf int16) *int16 { return &rf }(1),
124-
BootstrapServers: testingpkg.BootstrapServersPlaintextArr,
125-
ContentMode: pointer.StringPtr(eventingv1alpha1.ModeBinary),
91+
BootstrapServers: BootstrapServersPlaintextArr,
92+
ContentMode: pointer.StringPtr(mode),
93+
}
94+
for _, opt := range opts {
95+
require.Nil(t, opt(&kss))
96+
}
97+
98+
if sp != nil {
99+
secretData := sp(t, client)
100+
secret := &corev1.Secret{
101+
ObjectMeta: metav1.ObjectMeta{
102+
Namespace: client.Namespace,
103+
Name: sinkSecretName,
104+
},
105+
Data: secretData,
106+
}
107+
_, err = client.Kube.CoreV1().Secrets(client.Namespace).Create(ctx, secret, metav1.CreateOptions{})
108+
require.Nil(t, err)
126109
}
127110

128111
createFunc := sink.CreatorV1Alpha1(clientSet, kss)
@@ -131,14 +114,32 @@ func TestKafkaSinkV1Alpha1BinaryContentMode(t *testing.T) {
131114
Namespace: client.Namespace,
132115
Name: "kafka-sink",
133116
})
134-
assert.Nil(t, err)
117+
require.Nil(t, err)
135118

136119
client.WaitForResourceReadyOrFail(kafkaSink.Name, &kafkaSink.TypeMeta)
137120

138121
// Send events to the KafkaSink.
139122
ids := addressable.Send(t, kafkaSink)
140123

141124
// Read events from the topic.
142-
sink.Verify(t, client, eventingv1alpha1.ModeBinary, kss.Topic, ids)
125+
sink.Verify(t, client, mode, kss.Topic, ids)
143126
})
144127
}
128+
129+
func withSecret(kss *eventingv1alpha1.KafkaSinkSpec) error {
130+
kss.Auth = &eventingv1alpha1.Auth{
131+
Secret: &eventingv1alpha1.Secret{
132+
Ref: &eventingv1alpha1.SecretReference{
133+
Name: sinkSecretName,
134+
},
135+
},
136+
}
137+
return nil
138+
}
139+
140+
func withBootstrap(bs []string) func(kss *eventingv1alpha1.KafkaSinkSpec) error {
141+
return func(kss *eventingv1alpha1.KafkaSinkSpec) error {
142+
kss.BootstrapServers = bs
143+
return nil
144+
}
145+
}

0 commit comments

Comments
 (0)