Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing common integration helper and change package name #8327

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"knative.dev/eventing/pkg/reconciler/channel"
"knative.dev/eventing/pkg/reconciler/containersource"
"knative.dev/eventing/pkg/reconciler/eventtype"
"knative.dev/eventing/pkg/reconciler/integrationsource"
integrationsource "knative.dev/eventing/pkg/reconciler/integration/source"
"knative.dev/eventing/pkg/reconciler/parallel"
"knative.dev/eventing/pkg/reconciler/pingsource"
"knative.dev/eventing/pkg/reconciler/sequence"
Expand Down
126 changes: 126 additions & 0 deletions pkg/reconciler/integration/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"fmt"
"reflect"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
)

func GenerateEnvVarsFromStruct(prefix string, s interface{}) []corev1.EnvVar {
var envVars []corev1.EnvVar

// Use reflection to inspect the struct fields
v := reflect.ValueOf(s)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}

t := v.Type()

for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
fieldType := t.Field(i)

// Skip unexported fields
if !field.CanInterface() {
continue
}

// Handle embedded/anonymous structs recursively
if fieldType.Anonymous && field.Kind() == reflect.Struct {
// Recursively handle embedded structs with the same prefix
envVars = append(envVars, GenerateEnvVarsFromStruct(prefix, field.Interface())...)
continue
}

// First, check for the custom 'camel' tag
envVarName := fieldType.Tag.Get("camel")
if envVarName == "" {
// If 'camel' tag is not present, fall back to the 'json' tag or Go field name
jsonTag := fieldType.Tag.Get("json")
tagName := strings.Split(jsonTag, ",")[0]
if tagName == "" || tagName == "-" {
tagName = fieldType.Name
}
envVarName = fmt.Sprintf("%s_%s", prefix, strings.ToUpper(tagName))
}

if field.Kind() == reflect.Ptr {
if field.IsNil() {
continue
}
field = field.Elem()
}

var value string
switch field.Kind() {
case reflect.Int, reflect.Int32, reflect.Int64:
value = strconv.FormatInt(field.Int(), 10)
case reflect.Bool:
value = strconv.FormatBool(field.Bool())
case reflect.String:
value = field.String()
default:
// Skip unsupported types
continue
}

// Skip zero/empty values
if value == "" {
continue
}

envVars = append(envVars, corev1.EnvVar{
Name: envVarName,
Value: value,
})
}

return envVars
}

func MakeSecretEnvVar(name, key, secretName string) corev1.EnvVar {
return corev1.EnvVar{
Name: name,
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
Key: key,
LocalObjectReference: corev1.LocalObjectReference{
Name: secretName,
},
},
},
}
}

func MakeSSLEnvVar() []corev1.EnvVar {
return []corev1.EnvVar{
{
Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED",
Value: "true",
},
{
Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH",
Value: "/knative-custom-certs/knative-eventing-bundle.pem",
},
}
}
77 changes: 77 additions & 0 deletions pkg/reconciler/integration/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"testing"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
)

func TestGenerateEnvVarsFromStruct(t *testing.T) {
type TestStruct struct {
Field1 int `json:"field1"`
Field2 bool `json:"field2"`
Field3 string `json:"field3"`
}

prefix := "TEST_PREFIX"
input := &TestStruct{
Field1: 123,
Field2: true,
Field3: "hello",
}

// Expected environment variables including SSL settings
want := []corev1.EnvVar{
{Name: "TEST_PREFIX_FIELD1", Value: "123"},
{Name: "TEST_PREFIX_FIELD2", Value: "true"},
{Name: "TEST_PREFIX_FIELD3", Value: "hello"},
}

got := GenerateEnvVarsFromStruct(prefix, input)

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("generateEnvVarsFromStruct() mismatch (-want +got):\n%s", diff)
}
}

func TestGenerateEnvVarsFromStruct_S3WithCamelTag(t *testing.T) {
type AWSS3 struct {
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"`
Region string `json:"region,omitempty"`
}

prefix := "CAMEL_KAMELET_AWS_S3_SOURCE"
input := AWSS3{
Arn: "arn:aws:s3:::example-bucket",
Region: "us-west-2",
}

// Expected environment variables including SSL settings and camel tag for Arn
want := []corev1.EnvVar{
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"},
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"},
}

got := GenerateEnvVarsFromStruct(prefix, input)

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("generateEnvVarsFromStruct_S3WithCamelTag() mismatch (-want +got):\n%s", diff)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package integrationsource
package source

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package integrationsource
package source

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package integrationsource
package source

import (
"context"
"fmt"

"knative.dev/eventing/pkg/reconciler/integration/source/resources"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -32,7 +34,6 @@ import (
"knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/integrationsource"
v1listers "knative.dev/eventing/pkg/client/listers/sources/v1"
listers "knative.dev/eventing/pkg/client/listers/sources/v1alpha1"
"knative.dev/eventing/pkg/reconciler/integrationsource/resources"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package integrationsource
package source

import (
"fmt"
Expand Down
133 changes: 133 additions & 0 deletions pkg/reconciler/integration/source/resources/containersource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright 2024 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resources

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
"knative.dev/eventing/pkg/apis/sources/v1alpha1"
"knative.dev/eventing/pkg/reconciler/integration"
"knative.dev/pkg/kmeta"
)

const (
awsAccessKey = "aws.accessKey"
awsSecretKey = "aws.secretKey"
)

func NewContainerSource(source *v1alpha1.IntegrationSource) *sourcesv1.ContainerSource {
return &sourcesv1.ContainerSource{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: []metav1.OwnerReference{
*kmeta.NewControllerRef(source),
},
Name: ContainerSourceName(source),
Namespace: source.Namespace,
},
Spec: sourcesv1.ContainerSourceSpec{

Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "source",
Image: selectImage(source),
ImagePullPolicy: corev1.PullIfNotPresent,
Env: makeEnv(source),
},
},
},
},
SourceSpec: source.Spec.SourceSpec,
},
}
}

// Function to create environment variables for Timer or AWS configurations dynamically
func makeEnv(source *v1alpha1.IntegrationSource) []corev1.EnvVar {
var envVars = integration.MakeSSLEnvVar()

// Timer environment variables
if source.Spec.Timer != nil {
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_TIMER_SOURCE", *source.Spec.Timer)...)
return envVars
}

// Handle secret name only if AWS is configured
var secretName string
if source.Spec.Aws != nil && source.Spec.Aws.Auth != nil && source.Spec.Aws.Auth.Secret != nil && source.Spec.Aws.Auth.Secret.Ref != nil {
secretName = source.Spec.Aws.Auth.Secret.Ref.Name
}

// AWS S3 environment variables
if source.Spec.Aws != nil && source.Spec.Aws.S3 != nil {
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_S3_SOURCE", *source.Spec.Aws.S3)...)
if secretName != "" {
envVars = append(envVars, []corev1.EnvVar{
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_ACCESSKEY", awsAccessKey, secretName),
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_S3_SOURCE_SECRETKEY", awsSecretKey, secretName),
}...)
}
return envVars
}

// AWS SQS environment variables
if source.Spec.Aws != nil && source.Spec.Aws.SQS != nil {
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_SQS_SOURCE", *source.Spec.Aws.SQS)...)
if secretName != "" {
envVars = append(envVars, []corev1.EnvVar{
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_ACCESSKEY", awsAccessKey, secretName),
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_SQS_SOURCE_SECRETKEY", awsSecretKey, secretName),
}...)
}
return envVars
}

// AWS DynamoDB Streams environment variables
if source.Spec.Aws != nil && source.Spec.Aws.DDBStreams != nil {
envVars = append(envVars, integration.GenerateEnvVarsFromStruct("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE", *source.Spec.Aws.DDBStreams)...)
if secretName != "" {
envVars = append(envVars, []corev1.EnvVar{
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_ACCESSKEY", awsAccessKey, secretName),
integration.MakeSecretEnvVar("CAMEL_KAMELET_AWS_DDB_STREAMS_SOURCE_SECRETKEY", awsSecretKey, secretName),
}...)
}
return envVars
}

// If no valid configuration is found, return empty envVars
return envVars
}

func selectImage(source *v1alpha1.IntegrationSource) string {
if source.Spec.Timer != nil {
return "gcr.io/knative-nightly/timer-source:latest"
}
if source.Spec.Aws != nil {
if source.Spec.Aws.S3 != nil {
return "gcr.io/knative-nightly/aws-s3-source:latest"
}
if source.Spec.Aws.SQS != nil {
return "gcr.io/knative-nightly/aws-sqs-source:latest"
}
if source.Spec.Aws.DDBStreams != nil {
return "gcr.io/knative-nightly/aws-ddb-streams-source:latest"
}
}
return ""
}
Loading
Loading