Skip to content

Commit ebe99e6

Browse files
authored
Introducing common integration helper and change package name (#8327)
* create new integration/source folder and factor common code into helper.go Signed-off-by: Matthias Wessendorf <[email protected]> * 💄 improve formatting... Signed-off-by: Matthias Wessendorf <[email protected]> * fixing import Signed-off-by: Matthias Wessendorf <[email protected]> --------- Signed-off-by: Matthias Wessendorf <[email protected]>
1 parent 201e096 commit ebe99e6

File tree

11 files changed

+343
-296
lines changed

11 files changed

+343
-296
lines changed

cmd/controller/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
"knative.dev/eventing/pkg/reconciler/channel"
4242
"knative.dev/eventing/pkg/reconciler/containersource"
4343
"knative.dev/eventing/pkg/reconciler/eventtype"
44-
"knative.dev/eventing/pkg/reconciler/integrationsource"
44+
integrationsource "knative.dev/eventing/pkg/reconciler/integration/source"
4545
"knative.dev/eventing/pkg/reconciler/parallel"
4646
"knative.dev/eventing/pkg/reconciler/pingsource"
4747
"knative.dev/eventing/pkg/reconciler/sequence"

pkg/reconciler/integration/helper.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
Copyright 2024 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package integration
18+
19+
import (
20+
"fmt"
21+
"reflect"
22+
"strconv"
23+
"strings"
24+
25+
corev1 "k8s.io/api/core/v1"
26+
)
27+
28+
func GenerateEnvVarsFromStruct(prefix string, s interface{}) []corev1.EnvVar {
29+
var envVars []corev1.EnvVar
30+
31+
// Use reflection to inspect the struct fields
32+
v := reflect.ValueOf(s)
33+
if v.Kind() == reflect.Ptr {
34+
v = v.Elem()
35+
}
36+
37+
t := v.Type()
38+
39+
for i := 0; i < v.NumField(); i++ {
40+
field := v.Field(i)
41+
fieldType := t.Field(i)
42+
43+
// Skip unexported fields
44+
if !field.CanInterface() {
45+
continue
46+
}
47+
48+
// Handle embedded/anonymous structs recursively
49+
if fieldType.Anonymous && field.Kind() == reflect.Struct {
50+
// Recursively handle embedded structs with the same prefix
51+
envVars = append(envVars, GenerateEnvVarsFromStruct(prefix, field.Interface())...)
52+
continue
53+
}
54+
55+
// First, check for the custom 'camel' tag
56+
envVarName := fieldType.Tag.Get("camel")
57+
if envVarName == "" {
58+
// If 'camel' tag is not present, fall back to the 'json' tag or Go field name
59+
jsonTag := fieldType.Tag.Get("json")
60+
tagName := strings.Split(jsonTag, ",")[0]
61+
if tagName == "" || tagName == "-" {
62+
tagName = fieldType.Name
63+
}
64+
envVarName = fmt.Sprintf("%s_%s", prefix, strings.ToUpper(tagName))
65+
}
66+
67+
if field.Kind() == reflect.Ptr {
68+
if field.IsNil() {
69+
continue
70+
}
71+
field = field.Elem()
72+
}
73+
74+
var value string
75+
switch field.Kind() {
76+
case reflect.Int, reflect.Int32, reflect.Int64:
77+
value = strconv.FormatInt(field.Int(), 10)
78+
case reflect.Bool:
79+
value = strconv.FormatBool(field.Bool())
80+
case reflect.String:
81+
value = field.String()
82+
default:
83+
// Skip unsupported types
84+
continue
85+
}
86+
87+
// Skip zero/empty values
88+
if value == "" {
89+
continue
90+
}
91+
92+
envVars = append(envVars, corev1.EnvVar{
93+
Name: envVarName,
94+
Value: value,
95+
})
96+
}
97+
98+
return envVars
99+
}
100+
101+
func MakeSecretEnvVar(name, key, secretName string) corev1.EnvVar {
102+
return corev1.EnvVar{
103+
Name: name,
104+
ValueFrom: &corev1.EnvVarSource{
105+
SecretKeyRef: &corev1.SecretKeySelector{
106+
Key: key,
107+
LocalObjectReference: corev1.LocalObjectReference{
108+
Name: secretName,
109+
},
110+
},
111+
},
112+
}
113+
}
114+
115+
func MakeSSLEnvVar() []corev1.EnvVar {
116+
return []corev1.EnvVar{
117+
{
118+
Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED",
119+
Value: "true",
120+
},
121+
{
122+
Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH",
123+
Value: "/knative-custom-certs/knative-eventing-bundle.pem",
124+
},
125+
}
126+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
Copyright 2024 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package integration
18+
19+
import (
20+
"testing"
21+
22+
"github.com/google/go-cmp/cmp"
23+
corev1 "k8s.io/api/core/v1"
24+
)
25+
26+
func TestGenerateEnvVarsFromStruct(t *testing.T) {
27+
type TestStruct struct {
28+
Field1 int `json:"field1"`
29+
Field2 bool `json:"field2"`
30+
Field3 string `json:"field3"`
31+
}
32+
33+
prefix := "TEST_PREFIX"
34+
input := &TestStruct{
35+
Field1: 123,
36+
Field2: true,
37+
Field3: "hello",
38+
}
39+
40+
// Expected environment variables including SSL settings
41+
want := []corev1.EnvVar{
42+
{Name: "TEST_PREFIX_FIELD1", Value: "123"},
43+
{Name: "TEST_PREFIX_FIELD2", Value: "true"},
44+
{Name: "TEST_PREFIX_FIELD3", Value: "hello"},
45+
}
46+
47+
got := GenerateEnvVarsFromStruct(prefix, input)
48+
49+
if diff := cmp.Diff(want, got); diff != "" {
50+
t.Errorf("generateEnvVarsFromStruct() mismatch (-want +got):\n%s", diff)
51+
}
52+
}
53+
54+
func TestGenerateEnvVarsFromStruct_S3WithCamelTag(t *testing.T) {
55+
type AWSS3 struct {
56+
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"`
57+
Region string `json:"region,omitempty"`
58+
}
59+
60+
prefix := "CAMEL_KAMELET_AWS_S3_SOURCE"
61+
input := AWSS3{
62+
Arn: "arn:aws:s3:::example-bucket",
63+
Region: "us-west-2",
64+
}
65+
66+
// Expected environment variables including SSL settings and camel tag for Arn
67+
want := []corev1.EnvVar{
68+
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"},
69+
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"},
70+
}
71+
72+
got := GenerateEnvVarsFromStruct(prefix, input)
73+
74+
if diff := cmp.Diff(want, got); diff != "" {
75+
t.Errorf("generateEnvVarsFromStruct_S3WithCamelTag() mismatch (-want +got):\n%s", diff)
76+
}
77+
}

pkg/reconciler/integrationsource/controller.go renamed to pkg/reconciler/integration/source/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package integrationsource
17+
package source
1818

1919
import (
2020
"context"

pkg/reconciler/integrationsource/controller_test.go renamed to pkg/reconciler/integration/source/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package integrationsource
17+
package source
1818

1919
import (
2020
"context"

pkg/reconciler/integrationsource/integrationsource.go renamed to pkg/reconciler/integration/source/integrationsource.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package integrationsource
17+
package source
1818

1919
import (
2020
"context"
2121
"fmt"
2222

23+
"knative.dev/eventing/pkg/reconciler/integration/source/resources"
24+
2325
"go.uber.org/zap"
2426
corev1 "k8s.io/api/core/v1"
2527
"k8s.io/apimachinery/pkg/api/equality"
@@ -32,7 +34,6 @@ import (
3234
"knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/integrationsource"
3335
v1listers "knative.dev/eventing/pkg/client/listers/sources/v1"
3436
listers "knative.dev/eventing/pkg/client/listers/sources/v1alpha1"
35-
"knative.dev/eventing/pkg/reconciler/integrationsource/resources"
3637
"knative.dev/pkg/controller"
3738
"knative.dev/pkg/logging"
3839
pkgreconciler "knative.dev/pkg/reconciler"

pkg/reconciler/integrationsource/integrationsource_test.go renamed to pkg/reconciler/integration/source/integrationsource_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package integrationsource
17+
package source
1818

1919
import (
2020
"fmt"
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
Copyright 2024 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package resources
18+
19+
import (
20+
corev1 "k8s.io/api/core/v1"
21+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22+
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
23+
"knative.dev/eventing/pkg/apis/sources/v1alpha1"
24+
"knative.dev/eventing/pkg/reconciler/integration"
25+
"knative.dev/pkg/kmeta"
26+
)
27+
28+
const (
29+
awsAccessKey = "aws.accessKey"
30+
awsSecretKey = "aws.secretKey"
31+
)
32+
33+
func NewContainerSource(source *v1alpha1.IntegrationSource) *sourcesv1.ContainerSource {
34+
return &sourcesv1.ContainerSource{
35+
ObjectMeta: metav1.ObjectMeta{
36+
OwnerReferences: []metav1.OwnerReference{
37+
*kmeta.NewControllerRef(source),
38+
},
39+
Name: ContainerSourceName(source),
40+
Namespace: source.Namespace,
41+
},
42+
Spec: sourcesv1.ContainerSourceSpec{
43+
44+
Template: corev1.PodTemplateSpec{
45+
Spec: corev1.PodSpec{
46+
Containers: []corev1.Container{
47+
{
48+
Name: "source",
49+
Image: selectImage(source),
50+
ImagePullPolicy: corev1.PullIfNotPresent,
51+
Env: makeEnv(source),
52+
},
53+
},
54+
},
55+
},
56+
SourceSpec: source.Spec.SourceSpec,
57+
},
58+
}
59+
}
60+
61+
// Function to create environment variables for Timer or AWS configurations dynamically
62+
func makeEnv(source *v1alpha1.IntegrationSource) []corev1.EnvVar {
63+
var envVars = integration.MakeSSLEnvVar()
64+
65+
// Timer environment variables
66+
if source.Spec.Timer != nil {
67+
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_TIMER_SOURCE", *source.Spec.Timer)...)
68+
return envVars
69+
}
70+
71+
// Handle secret name only if AWS is configured
72+
var secretName string
73+
if source.Spec.Aws != nil && source.Spec.Aws.Auth != nil && source.Spec.Aws.Auth.Secret != nil && source.Spec.Aws.Auth.Secret.Ref != nil {
74+
secretName = source.Spec.Aws.Auth.Secret.Ref.Name
75+
}
76+
77+
// AWS S3 environment variables
78+
if source.Spec.Aws != nil && source.Spec.Aws.S3 != nil {
79+
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_S3_SOURCE", *source.Spec.Aws.S3)...)
80+
if secretName != "" {
81+
envVars = append(envVars, []corev1.EnvVar{
82+
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_ACCESSKEY", awsAccessKey, secretName),
83+
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_SECRETKEY", awsSecretKey, secretName),
84+
}...)
85+
}
86+
return envVars
87+
}
88+
89+
// AWS SQS environment variables
90+
if source.Spec.Aws != nil && source.Spec.Aws.SQS != nil {
91+
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_SQS_SOURCE", *source.Spec.Aws.SQS)...)
92+
if secretName != "" {
93+
envVars = append(envVars, []corev1.EnvVar{
94+
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_ACCESSKEY", awsAccessKey, secretName),
95+
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_SECRETKEY", awsSecretKey, secretName),
96+
}...)
97+
}
98+
return envVars
99+
}
100+
101+
// AWS DynamoDB Streams environment variables
102+
if source.Spec.Aws != nil && source.Spec.Aws.DDBStreams != nil {
103+
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE", *source.Spec.Aws.DDBStreams)...)
104+
if secretName != "" {
105+
envVars = append(envVars, []corev1.EnvVar{
106+
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_ACCESSKEY", awsAccessKey, secretName),
107+
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_SECRETKEY", awsSecretKey, secretName),
108+
}...)
109+
}
110+
return envVars
111+
}
112+
113+
// If no valid configuration is found, return empty envVars
114+
return envVars
115+
}
116+
117+
func selectImage(source *v1alpha1.IntegrationSource) string {
118+
if source.Spec.Timer != nil {
119+
return "gcr.io/knative-nightly/timer-source:latest"
120+
}
121+
if source.Spec.Aws != nil {
122+
if source.Spec.Aws.S3 != nil {
123+
return "gcr.io/knative-nightly/aws-s3-source:latest"
124+
}
125+
if source.Spec.Aws.SQS != nil {
126+
return "gcr.io/knative-nightly/aws-sqs-source:latest"
127+
}
128+
if source.Spec.Aws.DDBStreams != nil {
129+
return "gcr.io/knative-nightly/aws-ddb-streams-source:latest"
130+
}
131+
}
132+
return ""
133+
}

0 commit comments

Comments
 (0)