diff --git a/docs/eventing-api.md b/docs/eventing-api.md
index e4885cfcc9c..d1c521104f8 100644
--- a/docs/eventing-api.md
+++ b/docs/eventing-api.md
@@ -6354,7 +6354,7 @@ JobSinkStatus
s3
-knative.dev/eventing/pkg/apis/common.AWSS3
+knative.dev/eventing/pkg/apis/common/integration/v1alpha1.AWSS3
|
@@ -6364,7 +6364,7 @@ knative.dev/eventing/pkg/apis/common.AWSS3
|
sqs
-knative.dev/eventing/pkg/apis/common.AWSSQS
+knative.dev/eventing/pkg/apis/common/integration/v1alpha1.AWSSQS
|
@@ -6375,7 +6375,7 @@ knative.dev/eventing/pkg/apis/common.AWSSQS
|
auth
-knative.dev/eventing/pkg/apis/common.Auth
+knative.dev/eventing/pkg/apis/common/integration/v1alpha1.Auth
|
diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_defaults_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_defaults_test.go
index 41bdc3137ce..fd605809c77 100644
--- a/pkg/apis/sinks/v1alpha1/integration_sink_defaults_test.go
+++ b/pkg/apis/sinks/v1alpha1/integration_sink_defaults_test.go
@@ -18,8 +18,9 @@ package v1alpha1
import (
"context"
- "github.com/google/go-cmp/cmp"
"testing"
+
+ "github.com/google/go-cmp/cmp"
)
func TestIntegrationSinkSetDefaults(t *testing.T) {
diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go
index de5849fc3cc..1ad33e2cae9 100644
--- a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go
+++ b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle.go
@@ -93,7 +93,7 @@ func (s *IntegrationSinkStatus) MarkEventPoliciesTrueWithReason(reason, messageF
IntegrationSinkCondSet.Manage(s).MarkTrueWithReason(IntegrationSinkConditionEventPoliciesReady, reason, messageFormat, messageA...)
}
-func (s *IntegrationSinkStatus) PropagateDeploymentAvailability(d *appsv1.DeploymentStatus) {
+func (s *IntegrationSinkStatus) PropagateDeploymentStatus(d *appsv1.DeploymentStatus) {
deploymentAvailableFound := false
for _, cond := range d.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go
index 4cb2902666d..627b3b13c6c 100644
--- a/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go
+++ b/pkg/apis/sinks/v1alpha1/integration_sink_lifecycle_test.go
@@ -17,11 +17,12 @@ limitations under the License.
package v1alpha1
import (
+ "testing"
+
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
- "testing"
)
func TestIntegrationSinkGetConditionSet(t *testing.T) {
@@ -45,6 +46,9 @@ func TestIntegrationSinkInitializeConditions(t *testing.T) {
Conditions: []apis.Condition{{
Type: IntegrationSinkConditionAddressable,
Status: corev1.ConditionUnknown,
+ }, {
+ Type: IntegrationSinkConditionDeploymentReady,
+ Status: corev1.ConditionUnknown,
}, {
Type: IntegrationSinkConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
@@ -69,6 +73,9 @@ func TestIntegrationSinkInitializeConditions(t *testing.T) {
Conditions: []apis.Condition{{
Type: IntegrationSinkConditionAddressable,
Status: corev1.ConditionFalse,
+ }, {
+ Type: IntegrationSinkConditionDeploymentReady,
+ Status: corev1.ConditionUnknown,
}, {
Type: IntegrationSinkConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
@@ -93,6 +100,9 @@ func TestIntegrationSinkInitializeConditions(t *testing.T) {
Conditions: []apis.Condition{{
Type: IntegrationSinkConditionAddressable,
Status: corev1.ConditionTrue,
+ }, {
+ Type: IntegrationSinkConditionDeploymentReady,
+ Status: corev1.ConditionUnknown,
}, {
Type: IntegrationSinkConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_types.go b/pkg/apis/sinks/v1alpha1/integration_sink_types.go
index 95d0366cf9c..5e2dbb46fd0 100644
--- a/pkg/apis/sinks/v1alpha1/integration_sink_types.go
+++ b/pkg/apis/sinks/v1alpha1/integration_sink_types.go
@@ -20,7 +20,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
- "knative.dev/eventing/pkg/apis/common"
+ "knative.dev/eventing/pkg/apis/common/integration/v1alpha1"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
@@ -74,9 +74,9 @@ type Log struct {
}
type Aws struct {
- S3 *common.AWSS3 `json:"s3,omitempty"` // S3 source configuration
- SQS *common.AWSSQS `json:"sqs,omitempty"` // SQS source configuration
- Auth *common.Auth `json:"auth,omitempty"`
+ S3 *v1alpha1.AWSS3 `json:"s3,omitempty"` // S3 source configuration
+ SQS *v1alpha1.AWSSQS `json:"sqs,omitempty"` // SQS source configuration
+ Auth *v1alpha1.Auth `json:"auth,omitempty"`
}
type IntegrationSinkStatus struct {
diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go
index d148aa28f90..27efddffeca 100644
--- a/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go
+++ b/pkg/apis/sinks/v1alpha1/integration_sink_types_test.go
@@ -17,8 +17,9 @@ limitations under the License.
package v1alpha1
import (
- "knative.dev/eventing/pkg/apis/common"
"testing"
+
+ "knative.dev/eventing/pkg/apis/common/integration/v1alpha1"
)
func TestIntegrationSink_GetStatus(t *testing.T) {
@@ -54,30 +55,30 @@ func TestLog(t *testing.T) {
}
func TestAWS(t *testing.T) {
- s3 := common.AWSS3{
- AWSCommon: common.AWSCommon{
+ s3 := v1alpha1.AWSS3{
+ AWSCommon: v1alpha1.AWSCommon{
Region: "eu-north-1",
},
- BucketNameOrArn: "example-bucket",
+ Arn: "example-bucket",
}
if s3.Region != "eu-north-1" {
t.Errorf("AWSS3.Region = %v, want 'eu-north-1'", s3.Region)
}
- sqs := common.AWSSQS{
- AWSCommon: common.AWSCommon{
+ sqs := v1alpha1.AWSSQS{
+ AWSCommon: v1alpha1.AWSCommon{
Region: "eu-north-1",
},
- QueueNameOrArn: "example-queue",
+ Arn: "example-queue",
}
if sqs.Region != "eu-north-1" {
t.Errorf("AWSSQS.Region = %v, want 'eu-north-1'", sqs.Region)
}
- ddbStreams := common.AWSDDBStreams{
- AWSCommon: common.AWSCommon{
+ ddbStreams := v1alpha1.AWSDDBStreams{
+ AWSCommon: v1alpha1.AWSCommon{
Region: "eu-north-1",
},
Table: "example-table",
@@ -90,9 +91,9 @@ func TestAWS(t *testing.T) {
// TestAuthFieldAccess tests the HasAuth method and field access in Auth struct
func TestAuthFieldAccess(t *testing.T) {
- auth := common.Auth{
- Secret: &common.Secret{
- Ref: &common.SecretReference{
+ auth := v1alpha1.Auth{
+ Secret: &v1alpha1.Secret{
+ Ref: &v1alpha1.SecretReference{
Name: "aws-secret",
},
},
diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_validation.go b/pkg/apis/sinks/v1alpha1/integration_sink_validation.go
index 40d02310540..c96b83d7dbe 100644
--- a/pkg/apis/sinks/v1alpha1/integration_sink_validation.go
+++ b/pkg/apis/sinks/v1alpha1/integration_sink_validation.go
@@ -18,6 +18,7 @@ package v1alpha1
import (
"context"
+
"knative.dev/pkg/apis"
)
@@ -61,7 +62,7 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError
// Additional validation for AWS S3 required fields
if spec.Aws.S3 != nil {
- if spec.Aws.S3.BucketNameOrArn == "" {
+ if spec.Aws.S3.Arn == "" {
errs = errs.Also(apis.ErrMissingField("aws.s3.bucketNameOrArn"))
}
if spec.Aws.S3.Region == "" {
@@ -71,7 +72,7 @@ func (spec *IntegrationSinkSpec) Validate(ctx context.Context) *apis.FieldError
// Additional validation for AWS SQS required fields
if spec.Aws.SQS != nil {
- if spec.Aws.SQS.QueueNameOrArn == "" {
+ if spec.Aws.SQS.Arn == "" {
errs = errs.Also(apis.ErrMissingField("aws.sqs.queueNameOrArn"))
}
if spec.Aws.SQS.Region == "" {
diff --git a/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go b/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go
index d984eadd4e6..cba73a420b7 100644
--- a/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go
+++ b/pkg/apis/sinks/v1alpha1/integration_sink_validation_test.go
@@ -18,9 +18,10 @@ package v1alpha1
import (
"context"
- "knative.dev/eventing/pkg/apis/common"
"testing"
+ "knative.dev/eventing/pkg/apis/common/integration/v1alpha1"
+
"github.com/google/go-cmp/cmp"
"knative.dev/pkg/apis"
)
@@ -45,15 +46,15 @@ func TestIntegrationSinkSpecValidation(t *testing.T) {
name: "valid AWS S3 sink with auth and region",
spec: IntegrationSinkSpec{
Aws: &Aws{
- S3: &common.AWSS3{
- AWSCommon: common.AWSCommon{
+ S3: &v1alpha1.AWSS3{
+ AWSCommon: v1alpha1.AWSCommon{
Region: "us-east-1",
},
- BucketNameOrArn: "example-bucket",
+ Arn: "example-bucket",
},
- Auth: &common.Auth{
- Secret: &common.Secret{
- Ref: &common.SecretReference{
+ Auth: &v1alpha1.Auth{
+ Secret: &v1alpha1.Secret{
+ Ref: &v1alpha1.SecretReference{
Name: "aws-secret",
},
},
@@ -66,15 +67,15 @@ func TestIntegrationSinkSpecValidation(t *testing.T) {
name: "valid AWS SQS sink with auth and region",
spec: IntegrationSinkSpec{
Aws: &Aws{
- SQS: &common.AWSSQS{
- AWSCommon: common.AWSCommon{
+ SQS: &v1alpha1.AWSSQS{
+ AWSCommon: v1alpha1.AWSCommon{
Region: "us-east-1",
},
- QueueNameOrArn: "example-queue",
+ Arn: "example-queue",
},
- Auth: &common.Auth{
- Secret: &common.Secret{
- Ref: &common.SecretReference{
+ Auth: &v1alpha1.Auth{
+ Secret: &v1alpha1.Secret{
+ Ref: &v1alpha1.SecretReference{
Name: "aws-secret",
},
},
@@ -91,11 +92,11 @@ func TestIntegrationSinkSpecValidation(t *testing.T) {
ShowHeaders: true,
},
Aws: &Aws{
- S3: &common.AWSS3{
- AWSCommon: common.AWSCommon{
+ S3: &v1alpha1.AWSS3{
+ AWSCommon: v1alpha1.AWSCommon{
Region: "us-east-1",
},
- BucketNameOrArn: "example-bucket",
+ Arn: "example-bucket",
},
},
},
@@ -105,21 +106,21 @@ func TestIntegrationSinkSpecValidation(t *testing.T) {
name: "multiple AWS sinks set (invalid)",
spec: IntegrationSinkSpec{
Aws: &Aws{
- S3: &common.AWSS3{
- AWSCommon: common.AWSCommon{
+ S3: &v1alpha1.AWSS3{
+ AWSCommon: v1alpha1.AWSCommon{
Region: "us-east-1",
},
- BucketNameOrArn: "example-bucket",
+ Arn: "example-bucket",
},
- SQS: &common.AWSSQS{
- AWSCommon: common.AWSCommon{
+ SQS: &v1alpha1.AWSSQS{
+ AWSCommon: v1alpha1.AWSCommon{
Region: "us-east-1",
},
- QueueNameOrArn: "example-queue",
+ Arn: "example-queue",
},
- Auth: &common.Auth{
- Secret: &common.Secret{
- Ref: &common.SecretReference{
+ Auth: &v1alpha1.Auth{
+ Secret: &v1alpha1.Secret{
+ Ref: &v1alpha1.SecretReference{
Name: "aws-secret",
},
},
@@ -132,14 +133,14 @@ func TestIntegrationSinkSpecValidation(t *testing.T) {
name: "AWS SQS sink without QueueNameOrArn (invalid)",
spec: IntegrationSinkSpec{
Aws: &Aws{
- SQS: &common.AWSSQS{
- AWSCommon: common.AWSCommon{
+ SQS: &v1alpha1.AWSSQS{
+ AWSCommon: v1alpha1.AWSCommon{
Region: "us-east-1",
},
},
- Auth: &common.Auth{
- Secret: &common.Secret{
- Ref: &common.SecretReference{
+ Auth: &v1alpha1.Auth{
+ Secret: &v1alpha1.Secret{
+ Ref: &v1alpha1.SecretReference{
Name: "aws-secret",
},
},
@@ -157,11 +158,11 @@ func TestIntegrationSinkSpecValidation(t *testing.T) {
name: "AWS sink without auth (invalid)",
spec: IntegrationSinkSpec{
Aws: &Aws{
- S3: &common.AWSS3{
- AWSCommon: common.AWSCommon{
+ S3: &v1alpha1.AWSS3{
+ AWSCommon: v1alpha1.AWSCommon{
Region: "us-east-1",
},
- BucketNameOrArn: "example-bucket",
+ Arn: "example-bucket",
},
},
},
@@ -171,12 +172,12 @@ func TestIntegrationSinkSpecValidation(t *testing.T) {
name: "AWS S3 sink without region (invalid)",
spec: IntegrationSinkSpec{
Aws: &Aws{
- S3: &common.AWSS3{
- BucketNameOrArn: "example-bucket",
+ S3: &v1alpha1.AWSS3{
+ Arn: "example-bucket",
},
- Auth: &common.Auth{
- Secret: &common.Secret{
- Ref: &common.SecretReference{
+ Auth: &v1alpha1.Auth{
+ Secret: &v1alpha1.Secret{
+ Ref: &v1alpha1.SecretReference{
Name: "aws-secret",
},
},
diff --git a/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go
index 99dadeaaee1..3dff23adf1d 100644
--- a/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/sinks/v1alpha1/zz_generated.deepcopy.go
@@ -24,7 +24,7 @@ package v1alpha1
import (
v1 "k8s.io/api/batch/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
- common "knative.dev/eventing/pkg/apis/common"
+ integrationv1alpha1 "knative.dev/eventing/pkg/apis/common/integration/v1alpha1"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
@@ -32,17 +32,17 @@ func (in *Aws) DeepCopyInto(out *Aws) {
*out = *in
if in.S3 != nil {
in, out := &in.S3, &out.S3
- *out = new(common.AWSS3)
+ *out = new(integrationv1alpha1.AWSS3)
**out = **in
}
if in.SQS != nil {
in, out := &in.SQS, &out.SQS
- *out = new(common.AWSSQS)
+ *out = new(integrationv1alpha1.AWSSQS)
**out = **in
}
if in.Auth != nil {
in, out := &in.Auth, &out.Auth
- *out = new(common.Auth)
+ *out = new(integrationv1alpha1.Auth)
(*in).DeepCopyInto(*out)
}
return
diff --git a/pkg/reconciler/inmemorychannel/controller/resources/service.go b/pkg/reconciler/inmemorychannel/controller/resources/service.go
index 80935ebd70f..fc27155b300 100644
--- a/pkg/reconciler/inmemorychannel/controller/resources/service.go
+++ b/pkg/reconciler/inmemorychannel/controller/resources/service.go
@@ -52,7 +52,6 @@ func ExternalService(namespace, service string) K8sServiceOption {
}
// NewK8sService creates a new Service for a Channel resource. It also sets the appropriate
-
// OwnerReferences on the resource so handleObject can discover the Channel resource that 'owns' it.
// As well as being garbage collected when the Channel is deleted.
func NewK8sService(imc *v1.InMemoryChannel, opts ...K8sServiceOption) (*corev1.Service, error) {
diff --git a/pkg/reconciler/integration/sink/controller.go b/pkg/reconciler/integration/sink/controller.go
index 3dda75718a9..a50e2e8c9ac 100644
--- a/pkg/reconciler/integration/sink/controller.go
+++ b/pkg/reconciler/integration/sink/controller.go
@@ -1,7 +1,24 @@
+/*
+Copyright 2024 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
package sink
import (
"context"
+
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
v1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1"
diff --git a/pkg/reconciler/integration/sink/integrationsink.go b/pkg/reconciler/integration/sink/integrationsink.go
index 5851f667301..fcce9483eb3 100644
--- a/pkg/reconciler/integration/sink/integrationsink.go
+++ b/pkg/reconciler/integration/sink/integrationsink.go
@@ -1,8 +1,28 @@
+/*
+Copyright 2024 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
package sink
import (
"context"
"fmt"
+
+ v1 "k8s.io/api/apps/v1"
+ "k8s.io/apimachinery/pkg/api/equality"
+
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -27,6 +47,15 @@ import (
"knative.dev/pkg/reconciler"
)
+const (
+ // Name of the corev1.Events emitted from the reconciliation process
+ sinkReconciled = "IntegrationSinkReconciled"
+ deploymentCreated = "DeploymentCreated"
+ deploymentUpdated = "DeploymentUpdated"
+ serviceCreated = "ServiceCreated"
+ serviceUpdated = "ServiceUpdated"
+)
+
type Reconciler struct {
secretLister corev1listers.SecretLister
eventPolicyLister eventingv1alpha1listers.EventPolicyLister
@@ -39,15 +68,23 @@ type Reconciler struct {
systemNamespace string
}
+// newReconciledNormal makes a new reconciler event with event type Normal, and
+// reason IntegrationSink.
+func newReconciledNormal(namespace, name string) reconciler.Event {
+ return reconciler.NewEvent(corev1.EventTypeNormal, sinkReconciled, "IntegrationSink reconciled: \"%s/%s\"", namespace, name)
+}
+
func (r *Reconciler) ReconcileKind(ctx context.Context, sink *sinks.IntegrationSink) reconciler.Event {
featureFlags := feature.FromContext(ctx)
- if err := r.reconcileDeployment(ctx, sink); err != nil {
+ _, err := r.reconcileDeployment(ctx, sink)
+ if err != nil {
logging.FromContext(ctx).Errorw("Error reconciling Pod", zap.Error(err))
return err
}
- if err := r.reconcileService(ctx, sink); err != nil {
+ _, err = r.reconcileService(ctx, sink)
+ if err != nil {
logging.FromContext(ctx).Errorw("Error reconciling Service", zap.Error(err))
return err
}
@@ -56,65 +93,63 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, sink *sinks.IntegrationS
return fmt.Errorf("failed to reconcile address: %w", err)
}
- err := auth.UpdateStatusWithEventPolicies(featureFlags, &sink.Status.AppliedEventPoliciesStatus, &sink.Status, r.eventPolicyLister, sinks.SchemeGroupVersion.WithKind("IntegrationSink"), sink.ObjectMeta)
+ err = auth.UpdateStatusWithEventPolicies(featureFlags, &sink.Status.AppliedEventPoliciesStatus, &sink.Status, r.eventPolicyLister, sinks.SchemeGroupVersion.WithKind("IntegrationSink"), sink.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update IntegrationSink status with EventPolicies: %v", err)
}
- return nil
+ return newReconciledNormal(sink.Namespace, sink.Name)
}
-func (r *Reconciler) reconcileDeployment(ctx context.Context, sink *sinks.IntegrationSink) error {
-
- //updatedSink := sink.DeepCopy()
+func (r *Reconciler) reconcileDeployment(ctx context.Context, sink *sinks.IntegrationSink) (*v1.Deployment, error) {
expected := resources.MakeDeploymentSpec(sink)
- pod, err := r.deploymentLister.Deployments(sink.Namespace).Get(expected.Name)
+ deployment, err := r.deploymentLister.Deployments(sink.Namespace).Get(expected.Name)
if apierrors.IsNotFound(err) {
- pod, err = r.kubeClientSet.AppsV1().Deployments(sink.Namespace).Create(ctx, expected, metav1.CreateOptions{})
+ deployment, err = r.kubeClientSet.AppsV1().Deployments(sink.Namespace).Create(ctx, expected, metav1.CreateOptions{})
if err != nil {
- return fmt.Errorf("creating new Deployment: %v", err)
+ return nil, fmt.Errorf("creating new Deployment: %v", err)
}
- controller.GetEventRecorder(ctx).Eventf(sink, corev1.EventTypeNormal, "FIXME___reconciled", "Deployment created %q", pod.Name)
- //sink.Status.PropagateDeploymentAvailability(&pod.Status)
-
+ controller.GetEventRecorder(ctx).Eventf(sink, corev1.EventTypeNormal, deploymentCreated, "Deployment created %q", deployment.Name)
} else if err != nil {
- return fmt.Errorf("getting Deployment: %v", err)
- } else if !metav1.IsControlledBy(pod, sink) {
- return fmt.Errorf("pod %q is not owned by KameletSink %q", pod.Name, sink.Name)
- } else {
- logging.FromContext(ctx).Debugw("Reusing existing Deployment", zap.Any("Pod", pod))
+ return nil, fmt.Errorf("getting Deployment: %v", err)
+ } else if !metav1.IsControlledBy(deployment, sink) {
+ return nil, fmt.Errorf("Deployment %q is not owned by IntegrationSink %q", deployment.Name, sink.Name)
+ } else if r.podSpecChanged(deployment.Spec.Template.Spec, expected.Spec.Template.Spec) {
+ deployment.Spec.Template.Spec = expected.Spec.Template.Spec
+ deployment, err = r.kubeClientSet.AppsV1().Deployments(sink.Namespace).Update(ctx, deployment, metav1.UpdateOptions{})
+ if err != nil {
+ return nil, fmt.Errorf("updating Deployment: %v", err)
+ }
+ controller.GetEventRecorder(ctx).Eventf(sink, corev1.EventTypeNormal, deploymentUpdated, "Deployment %q updated", deployment.Name)
+ } else {
+ logging.FromContext(ctx).Debugw("Reusing existing Deployment", zap.Any("Deployment", deployment))
}
- ////logging.FromContext(ctx).Infow("Reusing existing Deployment", zap.Any("Pod", pod))
- ////logging.FromContext(ctx).Infow("Reusing existing Deployment", zap.Any("Status", pod.Status))
- ////logging.FromContext(ctx).Infow("Reusing existing Deployment", zap.Any("sink", sink))
- ////logging.FromContext(ctx).Infow("Reusing existing Deployment", zap.Any("sink-Status", sink.Status))
- //
- sink.Status.PropagateDeploymentAvailability(&pod.Status)
- return nil
+ sink.Status.PropagateDeploymentStatus(&deployment.Status)
+ return deployment, nil
}
-func (r *Reconciler) reconcileService(ctx context.Context, sink *sinks.IntegrationSink) error {
+func (r *Reconciler) reconcileService(ctx context.Context, sink *sinks.IntegrationSink) (*corev1.Service, error) {
expected := resources.MakeService(sink)
svc, err := r.serviceLister.Services(sink.Namespace).Get(expected.Name)
if apierrors.IsNotFound(err) {
svc, err := r.kubeClientSet.CoreV1().Services(sink.Namespace).Create(ctx, expected, metav1.CreateOptions{})
if err != nil {
- return fmt.Errorf("creating new Service: %v", err)
+ return nil, fmt.Errorf("creating new Service: %v", err)
}
- controller.GetEventRecorder(ctx).Eventf(sink, corev1.EventTypeNormal, "FIXME___reconciled", "Service created %q", svc.Name)
+ controller.GetEventRecorder(ctx).Eventf(sink, corev1.EventTypeNormal, serviceCreated, "Service created %q", svc.Name)
} else if err != nil {
- return fmt.Errorf("getting Service : %v", err)
+ return nil, fmt.Errorf("getting Service : %v", err)
} else if !metav1.IsControlledBy(svc, sink) {
- return fmt.Errorf("service %q is not owned by KameletSink %q", svc.Name, sink.Name)
+ return nil, fmt.Errorf("Service %q is not owned by IntegrationSink %q", svc.Name, sink.Name)
} else {
logging.FromContext(ctx).Debugw("Reusing existing Service", zap.Any("Service", svc))
}
- return nil
+ return svc, nil
}
func (r *Reconciler) reconcileAddress(ctx context.Context, sink *sinks.IntegrationSink) error {
@@ -194,7 +229,6 @@ func (r *Reconciler) httpAddress(sink *sinks.IntegrationSink) duckv1.Addressable
URL: &apis.URL{
Scheme: "http",
Host: network.GetServiceHostname(sink.GetName()+"-deployment", sink.GetNamespace()),
- // Path: fmt.Sprintf("/%s/%s", sink.GetNamespace(), sink.GetName()),
},
}
return httpAddress
@@ -206,3 +240,18 @@ func (r *Reconciler) httpsAddress(certs *string, sink *sinks.IntegrationSink) du
addr.CACerts = certs
return addr
}
+
+func (r *Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool {
+ if !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec) {
+ return true
+ }
+ if len(oldPodSpec.Containers) != len(newPodSpec.Containers) {
+ return true
+ }
+ for i := range newPodSpec.Containers {
+ if !equality.Semantic.DeepEqual(newPodSpec.Containers[i].Env, oldPodSpec.Containers[i].Env) {
+ return true
+ }
+ }
+ return false
+}
diff --git a/pkg/reconciler/integration/sink/integrationsink_test.go b/pkg/reconciler/integration/sink/integrationsink_test.go
new file mode 100644
index 00000000000..a95c86496ee
--- /dev/null
+++ b/pkg/reconciler/integration/sink/integrationsink_test.go
@@ -0,0 +1,321 @@
+/*
+Copyright 2024 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package sink
+
+import (
+ "fmt"
+ "k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/utils/ptr"
+ "knative.dev/pkg/apis"
+ duckv1 "knative.dev/pkg/apis/duck/v1"
+ "knative.dev/pkg/network"
+
+ appsv1 "k8s.io/api/apps/v1"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ clientgotesting "k8s.io/client-go/testing"
+ sinksv1alpha1 "knative.dev/eventing/pkg/apis/sinks/v1alpha1"
+ fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
+ "knative.dev/eventing/pkg/client/injection/reconciler/sinks/v1alpha1/integrationsink"
+ "knative.dev/eventing/pkg/reconciler/integration/sink/resources"
+ fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake"
+ "knative.dev/pkg/kmeta"
+ "knative.dev/pkg/logging"
+
+ "context"
+
+ . "knative.dev/eventing/pkg/reconciler/testing/v1"
+ . "knative.dev/eventing/pkg/reconciler/testing/v1alpha1"
+
+ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable"
+ "knative.dev/pkg/configmap"
+ "knative.dev/pkg/controller"
+ logtesting "knative.dev/pkg/logging/testing"
+ . "knative.dev/pkg/reconciler/testing"
+
+ "testing"
+)
+
+const (
+ // testNamespace is the namespace used for testing.
+ sinkName = "test-integration-sink"
+ sinkUID = "1234-5678-90"
+ testNS = "test-namespace"
+)
+
+var (
+ conditionTrue = corev1.ConditionTrue
+ deploymentName = fmt.Sprintf("%s-deployment", sinkName)
+
+ sinkAddressable = duckv1.Addressable{
+ Name: ptr.To("http"),
+ URL: &apis.URL{
+ Scheme: "http",
+ Host: network.GetServiceHostname(deploymentName, testNS),
+ },
+ }
+)
+
+func TestReconcile(t *testing.T) {
+
+ table := TableTest{
+ {
+ Name: "bad work queue key",
+ Key: "too/many/parts",
+ },
+ {
+ Name: "key not found",
+ // Make sure Reconcile handles good keys that don't exist.
+ Key: "foo/not-found",
+ }, {
+ Name: "error creating deployment",
+ Objects: []runtime.Object{
+ NewIntegrationSink(sinkName, testNS,
+ WithIntegrationSinkUID(sinkUID),
+ WithIntegrationSinkSpec(makeIntegrationSinkSpec()),
+ ),
+ },
+ Key: testNS + "/" + sinkName,
+ WithReactors: []clientgotesting.ReactionFunc{
+ InduceFailure("create", "deployments"),
+ },
+ WantEvents: []string{
+ Eventf(corev1.EventTypeWarning, "InternalError", "creating new Deployment: inducing failure for %s %s", "create", "deployments"),
+ },
+ WantErr: true,
+ WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: NewIntegrationSink(sinkName, testNS,
+ WithIntegrationSinkUID(sinkUID),
+ WithIntegrationSinkSpec(makeIntegrationSinkSpec()),
+ WithInitIntegrationSinkConditions,
+ ),
+ }},
+ WantCreates: []runtime.Object{
+ makeDeployment(NewIntegrationSink(sinkName, testNS,
+ WithIntegrationSinkUID(sinkUID),
+ WithIntegrationSinkSpec(makeIntegrationSinkSpec())),
+ nil),
+ },
+ }, {
+ Name: "successfully reconciled and ready",
+ Objects: []runtime.Object{
+ NewIntegrationSink(sinkName, testNS,
+ WithIntegrationSinkUID(sinkUID),
+ WithIntegrationSinkSpec(makeIntegrationSinkSpec()),
+ ),
+ makeDeployment(NewIntegrationSink(sinkName, testNS,
+ WithIntegrationSinkUID(sinkUID),
+ WithIntegrationSinkSpec(makeIntegrationSinkSpec())),
+ &conditionTrue),
+ makeService(deploymentName, testNS),
+ },
+ Key: testNS + "/" + sinkName,
+ WantEvents: []string{
+ Eventf(corev1.EventTypeNormal, sinkReconciled, `IntegrationSink reconciled: "%s/%s"`, testNS, sinkName),
+ },
+ WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
+ Object: NewIntegrationSink(sinkName, testNS,
+ WithIntegrationSinkUID(sinkUID),
+ WithIntegrationSinkAddressableReady(),
+ WithIntegrationSinkAddress(&sinkAddressable),
+ WithIntegrationSinkSpec(makeIntegrationSinkSpec()),
+ WithIntegrationSinkEventPoliciesReadyBecauseOIDCDisabled(),
+ WithInitIntegrationSinkConditions,
+ WithIntegrationSinkPropagateDeploymenteStatus(makeDeploymentStatus(&conditionTrue)),
+ ),
+ }},
+ }}
+
+ logger := logtesting.TestLogger(t)
+ table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler {
+ ctx = addressable.WithDuck(ctx)
+ r := &Reconciler{
+ kubeClientSet: fakekubeclient.Get(ctx),
+ deploymentLister: listers.GetDeploymentLister(),
+ serviceLister: listers.GetServiceLister(),
+ secretLister: listers.GetSecretLister(),
+ eventPolicyLister: listers.GetEventPolicyLister(),
+ systemNamespace: testNS,
+ }
+
+ return integrationsink.NewReconciler(ctx, logging.FromContext(ctx), fakeeventingclient.Get(ctx), listers.GetIntegrationSinkLister(), controller.GetEventRecorder(ctx), r)
+ },
+ true,
+ logger,
+ ))
+}
+
+func makeDeployment(sink *sinksv1alpha1.IntegrationSink, ready *corev1.ConditionStatus) runtime.Object {
+
+ status := appsv1.DeploymentStatus{}
+ if ready != nil {
+ status.Conditions = []appsv1.DeploymentCondition{
+ {
+ Type: appsv1.DeploymentAvailable,
+ Status: *ready,
+ },
+ }
+ if *ready == corev1.ConditionTrue {
+ status.ReadyReplicas = 1
+ }
+ }
+
+ d := &appsv1.Deployment{
+ TypeMeta: metav1.TypeMeta{
+ APIVersion: "apps/v1",
+ Kind: "Deployment",
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: deploymentName,
+ Namespace: sink.Namespace,
+ OwnerReferences: []metav1.OwnerReference{
+ *kmeta.NewControllerRef(sink),
+ },
+ Labels: resources.Labels(sink.Name),
+ },
+ Status: status,
+ Spec: appsv1.DeploymentSpec{
+ Selector: &metav1.LabelSelector{
+ MatchLabels: resources.Labels(sink.Name),
+ },
+ Template: corev1.PodTemplateSpec{
+ ObjectMeta: metav1.ObjectMeta{
+ Labels: resources.Labels(sink.Name),
+ },
+ Spec: corev1.PodSpec{
+ Containers: []corev1.Container{
+ {
+ Name: "sink",
+ Image: "gcr.io/knative-nightly/log-sink:latest",
+ ImagePullPolicy: corev1.PullIfNotPresent,
+ Ports: []corev1.ContainerPort{{
+ ContainerPort: 8080,
+ Protocol: corev1.ProtocolTCP,
+ Name: "http",
+ }},
+ Env: []corev1.EnvVar{
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_LEVEL",
+ Value: "info",
+ },
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_LOGMASK",
+ Value: "false",
+ },
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_MULTILINE",
+ Value: "false",
+ },
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_SHOWALLPROPERTIES",
+ Value: "false",
+ },
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_SHOWBODY",
+ Value: "true",
+ },
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_SHOWBODYTYPE",
+ Value: "true",
+ },
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_SHOWEXCHANGEPATTERN",
+ Value: "false",
+ },
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_SHOWHEADERS",
+ Value: "true",
+ },
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_SHOWPROPERTIES",
+ Value: "false",
+ },
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_SHOWSTREAMS",
+ Value: "false",
+ },
+ {
+ Name: "CAMEL_KAMELET_LOG_SINK_SHOWCACHEDSTREAMS",
+ Value: "false",
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+ return d
+}
+
+func makeService(name, namespace string) *corev1.Service {
+ return &corev1.Service{
+ TypeMeta: metav1.TypeMeta{
+ APIVersion: "v1",
+ Kind: "Service",
+ },
+
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: namespace,
+ Labels: resources.Labels(sinkName),
+ OwnerReferences: []metav1.OwnerReference{
+ {
+ APIVersion: "sinks.knative.dev/v1alpha1",
+ Kind: "IntegrationSink",
+ Name: sinkName,
+ UID: sinkUID,
+ Controller: ptr.To(true),
+ BlockOwnerDeletion: ptr.To(true),
+ },
+ },
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {
+ Name: "http",
+ Protocol: corev1.ProtocolTCP,
+ Port: 80,
+ TargetPort: intstr.IntOrString{IntVal: 8080},
+ },
+ },
+ Selector: resources.Labels(sinkName),
+ },
+ }
+}
+
+func makeIntegrationSinkSpec() sinksv1alpha1.IntegrationSinkSpec {
+ return sinksv1alpha1.IntegrationSinkSpec{
+ Log: &sinksv1alpha1.Log{
+ Level: "info",
+ ShowHeaders: true,
+ ShowBody: true,
+ ShowBodyType: true,
+ },
+ }
+}
+
+func makeDeploymentStatus(ready *corev1.ConditionStatus) *appsv1.DeploymentStatus {
+ return &appsv1.DeploymentStatus{
+ Conditions: []appsv1.DeploymentCondition{{
+ Type: appsv1.DeploymentAvailable,
+ Status: *ready,
+ }},
+ Replicas: 1,
+ }
+}
diff --git a/pkg/reconciler/integration/sink/resources/container_image.go b/pkg/reconciler/integration/sink/resources/container_image.go
index 31537a75016..1d6928429b8 100644
--- a/pkg/reconciler/integration/sink/resources/container_image.go
+++ b/pkg/reconciler/integration/sink/resources/container_image.go
@@ -1,3 +1,19 @@
+/*
+Copyright 2024 The Knative Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
package resources
import (
@@ -10,8 +26,6 @@ import (
"knative.dev/pkg/kmeta"
)
-const componentSuffix = "_SINK"
-
func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink) *appsv1.Deployment {
deploy := &appsv1.Deployment{
diff --git a/pkg/reconciler/integration/sink/resources/labels.go b/pkg/reconciler/integration/sink/resources/labels.go
index 7c23dbcd77e..7a06aadb81c 100644
--- a/pkg/reconciler/integration/sink/resources/labels.go
+++ b/pkg/reconciler/integration/sink/resources/labels.go
@@ -1,11 +1,11 @@
/*
-Copyright 2019 The Knative Authors
+Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,15 +16,9 @@ limitations under the License.
package resources
-const (
- controllerAgentName = "kamelet-source-controller"
-)
-
func Labels(name string) map[string]string {
return map[string]string{
"app.kubernetes.io/name": name,
- //"app.kubernetes.io/version": "1.0-SNAPSHOT",
- //"eventing.knative.dev/connector": "timer-source",
}
}
diff --git a/pkg/reconciler/integration/source/integrationsource.go b/pkg/reconciler/integration/source/integrationsource.go
index 6cd63ba7b1d..ef94fd2fc89 100644
--- a/pkg/reconciler/integration/source/integrationsource.go
+++ b/pkg/reconciler/integration/source/integrationsource.go
@@ -22,8 +22,6 @@ import (
"knative.dev/eventing/pkg/reconciler/integration/source/resources"
- "knative.dev/eventing/pkg/reconciler/integration/source/resources"
-
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
diff --git a/pkg/reconciler/integration/source/integrationsource_test.go b/pkg/reconciler/integration/source/integrationsource_test.go
index d500c0cccbb..eee019f041d 100644
--- a/pkg/reconciler/integration/source/integrationsource_test.go
+++ b/pkg/reconciler/integration/source/integrationsource_test.go
@@ -79,7 +79,8 @@ func TestReconcile(t *testing.T) {
Name: "key not found",
// Make sure Reconcile handles good keys that don't exist.
Key: "foo/not-found",
- }, {
+ },
+ {
Name: "error creating containersource",
Objects: []runtime.Object{
NewIntegrationSource(sourceName, testNS,
@@ -160,12 +161,12 @@ func TestReconcile(t *testing.T) {
),
}},
}}
-
logger := logtesting.TestLogger(t)
table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler {
ctx = addressable.WithDuck(ctx)
r := &Reconciler{
+
kubeClientSet: fakekubeclient.Get(ctx),
eventingClientSet: fakeeventingclient.Get(ctx),
containerSourceLister: listers.GetContainerSourceLister(),
diff --git a/pkg/reconciler/testing/v1/listers.go b/pkg/reconciler/testing/v1/listers.go
index a6d0733136a..3ba7f5c833b 100644
--- a/pkg/reconciler/testing/v1/listers.go
+++ b/pkg/reconciler/testing/v1/listers.go
@@ -124,6 +124,10 @@ func (l *Listers) GetEventPolicyLister() eventingv1alpha1listers.EventPolicyList
return eventingv1alpha1listers.NewEventPolicyLister(l.indexerFor(&eventingv1alpha1.EventPolicy{}))
}
+func (l *Listers) GetIntegrationSinkLister() sinkslisters.IntegrationSinkLister {
+ return sinkslisters.NewIntegrationSinkLister(l.indexerFor(&sinksv1alpha1.IntegrationSink{}))
+}
+
func (l *Listers) GetJobSinkLister() sinkslisters.JobSinkLister {
return sinkslisters.NewJobSinkLister(l.indexerFor(&sinksv1alpha1.JobSink{}))
}
diff --git a/pkg/reconciler/testing/v1alpha1/integrationsink.go b/pkg/reconciler/testing/v1alpha1/integrationsink.go
new file mode 100644
index 00000000000..43d80a8f99b
--- /dev/null
+++ b/pkg/reconciler/testing/v1alpha1/integrationsink.go
@@ -0,0 +1,77 @@
+package v1alpha1
+
+import (
+ "context"
+ "knative.dev/eventing/pkg/apis/feature"
+ duckv1 "knative.dev/pkg/apis/duck/v1"
+
+ appsv1 "k8s.io/api/apps/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "knative.dev/eventing/pkg/apis/sinks/v1alpha1"
+)
+
+// IntegrationSinkOption enables further configuration of a IntegrationSink.
+type IntegrationSinkOption func(source *v1alpha1.IntegrationSink)
+
+// NewIntegrationSink creates a v1 IntegrationSink with IntegrationSinkOptions
+func NewIntegrationSink(name, namespace string, o ...IntegrationSinkOption) *v1alpha1.IntegrationSink {
+ s := &v1alpha1.IntegrationSink{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Namespace: namespace,
+ },
+ }
+ for _, opt := range o {
+ opt(s)
+ }
+ s.SetDefaults(context.Background())
+ return s
+}
+
+func WithIntegrationSinkUID(uid types.UID) IntegrationSinkOption {
+ return func(s *v1alpha1.IntegrationSink) {
+ s.UID = uid
+ }
+}
+
+// WithInitIntegrationSinkConditions initializes the IntegrationSink's conditions.
+func WithInitIntegrationSinkConditions(s *v1alpha1.IntegrationSink) {
+ s.Status.InitializeConditions()
+}
+
+func WithIntegrationSinkStatusObservedGeneration(generation int64) IntegrationSinkOption {
+ return func(s *v1alpha1.IntegrationSink) {
+ s.Status.ObservedGeneration = generation
+ }
+}
+
+func WithIntegrationSinkPropagateDeploymenteStatus(status *appsv1.DeploymentStatus) IntegrationSinkOption {
+ return func(s *v1alpha1.IntegrationSink) {
+ s.Status.PropagateDeploymentStatus(status)
+ }
+}
+
+func WithIntegrationSinkAddressableReady() IntegrationSinkOption {
+ return func(s *v1alpha1.IntegrationSink) {
+ s.Status.MarkAddressableReady()
+ }
+}
+
+func WithIntegrationSinkAddress(addr *duckv1.Addressable) IntegrationSinkOption {
+ return func(s *v1alpha1.IntegrationSink) {
+ s.Status.SetAddress(addr)
+ }
+}
+
+func WithIntegrationSinkSpec(spec v1alpha1.IntegrationSinkSpec) IntegrationSinkOption {
+ return func(s *v1alpha1.IntegrationSink) {
+ s.Spec = spec
+ }
+}
+
+func WithIntegrationSinkEventPoliciesReadyBecauseOIDCDisabled() IntegrationSinkOption {
+ return func(s *v1alpha1.IntegrationSink) {
+ s.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication)
+ }
+}
diff --git a/test/rekt/features/integrationsink/features.go b/test/rekt/features/integrationsink/features.go
index fd6c71132cd..b80628412d6 100644
--- a/test/rekt/features/integrationsink/features.go
+++ b/test/rekt/features/integrationsink/features.go
@@ -17,6 +17,8 @@ limitations under the License.
package integrationsink
import (
+ "time"
+
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"knative.dev/eventing/test/rekt/features/featureflags"
@@ -25,7 +27,6 @@ import (
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
- "time"
)
func Success() *feature.Feature {
@@ -35,12 +36,10 @@ func Success() *feature.Feature {
integrationSink := feature.MakeRandomK8sName("integrationsink")
source := feature.MakeRandomK8sName("source")
- //sinkURL := &apis.URL{Scheme: "http", Host: sink}
-
event := cetest.FullEvent()
event.SetID(uuid.NewString())
- f.Setup("install integration sink", integrationsink.Install(integrationSink)) //, integrationsink.WithForwarderJob(sinkURL.String())))
+ f.Setup("install integration sink", integrationsink.Install(integrationSink))
f.Setup("integrationsink is addressable", integrationsink.IsAddressable(integrationSink))
f.Setup("integrationsink is ready", integrationsink.IsReady(integrationSink))
@@ -51,21 +50,6 @@ func Success() *feature.Feature {
eventshub.AddSequence,
eventshub.SendMultipleEvents(2, time.Millisecond)))
- //f.Requirement("install source for ksink", eventshub.Install(source,
- // eventshub.StartSenderToResource(integrationsink.GVR(), integrationSink),
- // eventshub.InputEvent(cetest.FullEvent()),
- // eventshub.AddSequence,
- // eventshub.SendMultipleEvents(100, time.Millisecond)))
- //
- //f.Requirement("install source", eventshub.Install(source,
- // eventshub.StartSenderToResource(integrationsink.GVR(), integrationSink),
- // eventshub.InputEvent(event)))
-
- //f.Assert("Job is created with the mounted event", assert.OnStore(sink).
- // MatchReceivedEvent(cetest.HasId(event.ID())).
- // AtLeast(1),
- //)
- //
f.Assert("Source sent the event", assert.OnStore(source).
Match(assert.MatchKind(eventshub.EventResponse)).
Match(assert.MatchStatusCode(204)).
diff --git a/test/rekt/integration_sink_test.go b/test/rekt/integration_sink_test.go
index f0d595735f9..a05b38a1995 100644
--- a/test/rekt/integration_sink_test.go
+++ b/test/rekt/integration_sink_test.go
@@ -20,10 +20,9 @@ limitations under the License.
package rekt
import (
- "knative.dev/eventing/test/rekt/features/integrationsink"
- "knative.dev/reconciler-test/pkg/eventshub"
"testing"
+ "knative.dev/eventing/test/rekt/features/integrationsink"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/k8s"
@@ -43,52 +42,3 @@ func TestIntegrationSinkSuccess(t *testing.T) {
env.Test(ctx, t, integrationsink.Success())
}
-
-func TestIntegrationSinkSuccessTLS(t *testing.T) {
- t.Parallel()
-
- ctx, env := global.Environment(
- knative.WithKnativeNamespace(system.Namespace()),
- knative.WithLoggingConfig,
- knative.WithTracingConfig,
- k8s.WithEventListener,
- eventshub.WithTLS(t),
- environment.Managed(t),
- )
-
- env.Test(ctx, t, integrationsink.SuccessTLS())
-}
-
-//
-//func TestJobSinkOIDC(t *testing.T) {
-// t.Parallel()
-//
-// ctx, env := global.Environment(
-// knative.WithKnativeNamespace(system.Namespace()),
-// knative.WithLoggingConfig,
-// knative.WithTracingConfig,
-// k8s.WithEventListener,
-// eventshub.WithTLS(t),
-// environment.Managed(t),
-// )
-//
-// env.Test(ctx, t, jobsink.OIDC())
-//}
-//
-//func TestJobSinkSupportsAuthZ(t *testing.T) {
-// t.Parallel()
-//
-// ctx, env := global.Environment(
-// knative.WithKnativeNamespace(system.Namespace()),
-// knative.WithLoggingConfig,
-// knative.WithTracingConfig,
-// k8s.WithEventListener,
-// eventshub.WithTLS(t),
-// environment.Managed(t),
-// )
-//
-// name := feature.MakeRandomK8sName("jobsink")
-// env.Prerequisite(ctx, t, jsresource.GoesReadySimple(name))
-//
-// env.TestSet(ctx, t, authz.AddressableAuthZConformance(jsresource.GVR(), "JobSink", name))
-//}
diff --git a/test/rekt/resources/integrationsink/integrationsink.go b/test/rekt/resources/integrationsink/integrationsink.go
index f2eaddc1da1..6ed054918f7 100644
--- a/test/rekt/resources/integrationsink/integrationsink.go
+++ b/test/rekt/resources/integrationsink/integrationsink.go
@@ -3,6 +3,8 @@ package integrationsink
import (
"context"
"embed"
+ "time"
+
"k8s.io/apimachinery/pkg/runtime/schema"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
@@ -10,7 +12,6 @@ import (
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
"knative.dev/reconciler-test/pkg/manifest"
- "time"
)
//go:embed integrationsink.yaml
diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/pod.go b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/pod.go
deleted file mode 100644
index d547fef8f95..00000000000
--- a/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/pod.go
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
-Copyright 2022 The Knative Authors
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-// Code generated by injection-gen. DO NOT EDIT.
-
-package pod
-
-import (
- context "context"
-
- v1 "k8s.io/client-go/informers/core/v1"
- factory "knative.dev/pkg/client/injection/kube/informers/factory"
- controller "knative.dev/pkg/controller"
- injection "knative.dev/pkg/injection"
- logging "knative.dev/pkg/logging"
-)
-
-func init() {
- injection.Default.RegisterInformer(withInformer)
-}
-
-// Key is used for associating the Informer inside the context.Context.
-type Key struct{}
-
-func withInformer(ctx context.Context) (context.Context, controller.Informer) {
- f := factory.Get(ctx)
- inf := f.Core().V1().Pods()
- return context.WithValue(ctx, Key{}, inf), inf.Informer()
-}
-
-// Get extracts the typed informer from the context.
-func Get(ctx context.Context) v1.PodInformer {
- untyped := ctx.Value(Key{})
- if untyped == nil {
- logging.FromContext(ctx).Panic(
- "Unable to fetch k8s.io/client-go/informers/core/v1.PodInformer from context.")
- }
- return untyped.(v1.PodInformer)
-}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 74c4722751b..0baeb6a0025 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1130,7 +1130,6 @@ knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints
knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints/fake
knative.dev/pkg/client/injection/kube/informers/core/v1/namespace
knative.dev/pkg/client/injection/kube/informers/core/v1/namespace/fake
-knative.dev/pkg/client/injection/kube/informers/core/v1/pod
knative.dev/pkg/client/injection/kube/informers/core/v1/secret/filtered
knative.dev/pkg/client/injection/kube/informers/core/v1/service
knative.dev/pkg/client/injection/kube/informers/core/v1/service/fake
|