diff --git a/pkg/apis/messaging/v1/in_memory_channel_types.go b/pkg/apis/messaging/v1/in_memory_channel_types.go index d45d1a971b4..35ef3988aed 100644 --- a/pkg/apis/messaging/v1/in_memory_channel_types.go +++ b/pkg/apis/messaging/v1/in_memory_channel_types.go @@ -19,10 +19,11 @@ package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" ) // +genclient @@ -44,6 +45,14 @@ type InMemoryChannel struct { Status InMemoryChannelStatus `json:"status,omitempty"` } +var ( + // AsyncHandlerAnnotation controls whether InMemoryChannel uses the async handler. + // + // Async handler is subject to event loss since it responds with 200 before forwarding the event + // to all subscriptions. + AsyncHandlerAnnotation = SchemeGroupVersion.Group + "/async-handler" +) + var ( // Check that InMemoryChannel can be validated and defaulted. _ apis.Validatable = (*InMemoryChannel)(nil) diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index 7aa80acd0c3..d307b79d99a 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -60,8 +60,11 @@ type Subscription struct { // Config for a fanout.EventHandler. type Config struct { Subscriptions []Subscription `json:"subscriptions"` - // Deprecated: AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. + // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. // It is expected to be false when used as a sidecar. + // + // Async handler is subject to event loss since it responds with 200 before forwarding the event + // to all subscriptions. AsyncHandler bool `json:"asyncHandler,omitempty"` } diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 46e2bb32b0d..3550f73ab93 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "strings" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -108,10 +109,20 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk return err } - var tmpChannelableSpec duckv1.ChannelableSpec = duckv1.ChannelableSpec{ + var tmpChannelableSpec = duckv1.ChannelableSpec{ Delivery: b.Spec.Delivery, } + metadata := b.ObjectMeta.DeepCopy() + channelAnnotations := map[string]string{ + eventing.ScopeAnnotationKey: eventing.ScopeCluster, + } + for k, v := range metadata.GetAnnotations() { + if strings.HasPrefix(k, messagingv1.SchemeGroupVersion.Group) { + channelAnnotations[k] = v + } + } + logging.FromContext(ctx).Infow("Reconciling the trigger channel") c, err := ducklib.NewPhysicalChannel( chanMan.template.TypeMeta, @@ -122,7 +133,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, b *eventingv1.Broker) pk *kmeta.NewControllerRef(b), }, Labels: TriggerChannelLabels(b.Name, b.Namespace), - Annotations: map[string]string{eventing.ScopeAnnotationKey: eventing.ScopeCluster}, + Annotations: channelAnnotations, }, ducklib.WithChannelableSpec(tmpChannelableSpec), ducklib.WithPhysicalChannelSpec(chanMan.template.Spec), @@ -392,7 +403,9 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf return nil, fmt.Errorf("failed to convert %s/%s into Channelable: %w", channelObjRef.Namespace, channelObjRef.Name, err) } - if equality.Semantic.DeepEqual(desired.Spec.Delivery, channelable.Spec.Delivery) { + if equality.Semantic.DeepDerivative(desired.Spec.Delivery, channelable.Spec.Delivery) && + equality.Semantic.DeepDerivative(desired.Annotations, channelable.Annotations) && + equality.Semantic.DeepDerivative(desired.Labels, channelable.Labels) { // If propagated/mutable properties match return the Channel. return channelable, nil } @@ -402,12 +415,20 @@ func (r *Reconciler) reconcileChannel(ctx context.Context, channelResourceInterf jsonPatch, err := duckapis.CreatePatch( // Existing Channel properties duckv1.Channelable{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: channelable.Annotations, + Labels: channelable.Labels, + }, Spec: duckv1.ChannelableSpec{ Delivery: channelable.Spec.Delivery, }, }, // Desired Channel properties duckv1.Channelable{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: desired.Annotations, + Labels: desired.Labels, + }, Spec: duckv1.ChannelableSpec{ Delivery: desired.Spec.Delivery, }, diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index 3081a50c299..01ed141ea68 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -48,6 +48,7 @@ import ( eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/apis/feature" + v1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/auth" fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" @@ -405,6 +406,48 @@ func TestReconcile(t *testing.T) { WithDLSNotConfigured(), WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), }}, + }, { + Name: "Propagate annotations", + Key: testKey, + Objects: []runtime.Object{ + makeDLSServiceAsUnstructured(), + NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithInitBrokerConditions, + WithBrokerAnnotation(v1.AsyncHandlerAnnotation, "true")), + createChannel(withChannelReady), + imcConfigMap(), + NewEndpoints(filterServiceName, systemNS, + WithEndpointsLabels(FilterLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + NewEndpoints(ingressServiceName, systemNS, + WithEndpointsLabels(IngressLabels()), + WithEndpointsAddresses(corev1.EndpointAddress{IP: "127.0.0.1"})), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewBroker(brokerName, testNS, + WithBrokerClass(eventing.MTChannelBrokerClassValue), + WithBrokerConfig(config()), + WithBrokerAnnotation(v1.AsyncHandlerAnnotation, "true"), + WithBrokerReady, + WithDLSNotConfigured(), + WithBrokerAddressURI(brokerAddress), + WithChannelAddressAnnotation(triggerChannelURL), + WithChannelAPIVersionAnnotation(triggerChannelAPIVersion), + WithChannelKindAnnotation(triggerChannelKind), + WithChannelNameAnnotation(triggerChannelName), + WithBrokerEventPoliciesReadyBecauseOIDCDisabled()), + }}, + WantPatches: []clientgotesting.PatchActionImpl{ + { + ActionImpl: clientgotesting.ActionImpl{ + Namespace: testNS, + }, + Name: fmt.Sprintf("%s-kne-trigger", brokerName), + Patch: []byte(`[{"op":"add","path":"/metadata/annotations/messaging.knative.dev~1async-handler","value":"` + "true" + `"}]`), + }, + }, }, { Name: "Successful Reconciliation with a Channel with CA certs", Key: testKey, diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 8f663491d65..7dec76f9363 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -19,6 +19,7 @@ package dispatcher import ( "context" "fmt" + "strconv" listers "knative.dev/eventing/pkg/client/listers/messaging/v1" @@ -250,13 +251,21 @@ func newConfigForInMemoryChannel(ctx context.Context, imc *v1.InMemoryChannel) ( subs[i] = *conf } + async := false + if v, ok := imc.Annotations[v1.AsyncHandlerAnnotation]; ok { + b, err := strconv.ParseBool(v) + if err == nil { + async = b + } + } + return &multichannelfanout.ChannelConfig{ Namespace: imc.Namespace, Name: imc.Name, HostName: imc.Status.Address.URL.Host, Path: fmt.Sprintf("%s/%s", imc.Namespace, imc.Name), FanoutConfig: fanout.Config{ - AsyncHandler: false, + AsyncHandler: async, Subscriptions: subs, }, }, nil diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index ea85a6fc9a6..ac83622b395 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -19,11 +19,13 @@ package dispatcher import ( "context" "net/http" + "reflect" "testing" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" clientgotesting "k8s.io/client-go/testing" @@ -662,3 +664,106 @@ func (f *fakeMultiChannelHandler) GetChannelHandler(host string) fanout.EventHan func (f *fakeMultiChannelHandler) CountChannelHandlers() int { return len(f.handlers) } + +func Test_newConfigForInMemoryChannelAsyncHandler(t *testing.T) { + ctx, _ := SetupFakeContext(t, SetUpInformerSelector) + + type args struct { + ctx context.Context + imc *v1.InMemoryChannel + } + tests := []struct { + name string + args args + wantAsync bool + wantErr bool + }{ + { + name: "async handler", + args: args{ + ctx: ctx, + imc: &v1.InMemoryChannel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "n", + Namespace: "ns", + Annotations: map[string]string{ + v1.AsyncHandlerAnnotation: "true", + }, + }, + Status: v1.InMemoryChannelStatus{ + ChannelableStatus: eventingduckv1.ChannelableStatus{ + AddressStatus: duckv1.AddressStatus{ + Address: &duckv1.Addressable{ + URL: apis.HTTPS("something"), + }, + }, + }, + }, + }, + }, + wantAsync: true, + wantErr: false, + }, + { + name: "sync handler, default", + args: args{ + ctx: ctx, + imc: &v1.InMemoryChannel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "n", + Namespace: "ns", + }, + Status: v1.InMemoryChannelStatus{ + ChannelableStatus: eventingduckv1.ChannelableStatus{ + AddressStatus: duckv1.AddressStatus{ + Address: &duckv1.Addressable{ + URL: apis.HTTPS("something"), + }, + }, + }, + }, + }, + }, + wantAsync: false, + wantErr: false, + }, + { + name: "sync handler, explicit", + args: args{ + ctx: ctx, + imc: &v1.InMemoryChannel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "n", + Namespace: "ns", + Annotations: map[string]string{ + v1.AsyncHandlerAnnotation: "false", + }, + }, + Status: v1.InMemoryChannelStatus{ + ChannelableStatus: eventingduckv1.ChannelableStatus{ + AddressStatus: duckv1.AddressStatus{ + Address: &duckv1.Addressable{ + URL: apis.HTTPS("something"), + }, + }, + }, + }, + }, + }, + wantAsync: false, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := newConfigForInMemoryChannel(tt.args.ctx, tt.args.imc) + if (err != nil) != tt.wantErr { + t.Errorf("newConfigForInMemoryChannel() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got.FanoutConfig.AsyncHandler, tt.wantAsync) { + t.Errorf("newConfigForInMemoryChannel() got = %v, want %v", got, tt.wantAsync) + } + }) + } +} diff --git a/pkg/reconciler/testing/v1/broker.go b/pkg/reconciler/testing/v1/broker.go index e4bfa7141e4..b04ed910d96 100644 --- a/pkg/reconciler/testing/v1/broker.go +++ b/pkg/reconciler/testing/v1/broker.go @@ -61,6 +61,15 @@ func WithBrokerFinalizers(finalizers ...string) BrokerOption { } } +func WithBrokerAnnotation(key, value string) BrokerOption { + return func(b *v1.Broker) { + if b.Annotations == nil { + b.Annotations = map[string]string{} + } + b.Annotations[key] = value + } +} + func WithBrokerResourceVersion(rv string) BrokerOption { return func(b *v1.Broker) { b.ResourceVersion = rv diff --git a/test/rekt/broker_test.go b/test/rekt/broker_test.go index f0c8d866ad4..343553086b2 100644 --- a/test/rekt/broker_test.go +++ b/test/rekt/broker_test.go @@ -192,6 +192,22 @@ func TestBrokerRedelivery(t *testing.T) { env.TestSet(ctx, t, broker.BrokerRedelivery()) } +// TestBrokerPropagatesMetadata test Broker reconciler propagates metadata to channel. +func TestBrokerPropagatesMetadata(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + environment.WithPollTimings(5*time.Second, 4*time.Minute), + ) + + env.ParallelTest(ctx, t, broker.PropagatesMetadata()) +} + func TestBrokerDeadLetterSinkExtensions(t *testing.T) { t.Parallel() diff --git a/test/rekt/channel_test.go b/test/rekt/channel_test.go index c0c32358e39..aad5e42b21d 100644 --- a/test/rekt/channel_test.go +++ b/test/rekt/channel_test.go @@ -23,9 +23,10 @@ import ( "testing" "time" - "knative.dev/eventing/test/rekt/features/authz" "knative.dev/reconciler-test/pkg/feature" + "knative.dev/eventing/test/rekt/features/authz" + "github.com/cloudevents/sdk-go/v2/binding" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/system" @@ -205,6 +206,27 @@ func TestChannelDeadLetterSink(t *testing.T) { env.Test(ctx, t, channel.DeadLetterSink(createSubscriberFn)) } +/* +TestChannelAsyncHandler tests if the async handler can be configured on the channel. +*/ +func TestChannelAsyncHandler(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + createSubscriberFn := func(ref *duckv1.KReference, uri string) manifest.CfgFn { + return subscription.WithSubscriber(ref, uri, "") + } + env.ParallelTest(ctx, t, channel.AsyncHandler(createSubscriberFn)) + env.ParallelTest(ctx, t, channel.AsyncHandlerUpdate(createSubscriberFn)) +} + // TestGenericChannelDeadLetterSink tests if the events that cannot be delivered end up in // the dead letter sink. func TestGenericChannelDeadLetterSink(t *testing.T) { diff --git a/test/rekt/features/broker/feature.go b/test/rekt/features/broker/feature.go index 3704da705b7..9d7f4f09a16 100644 --- a/test/rekt/features/broker/feature.go +++ b/test/rekt/features/broker/feature.go @@ -19,6 +19,7 @@ package broker import ( "context" "encoding/base64" + "encoding/json" "fmt" "strings" @@ -26,14 +27,18 @@ import ( "github.com/cloudevents/sdk-go/v2/binding/spec" "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/state" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/channel" + "knative.dev/eventing/test/rekt/resources/channel_impl" "knative.dev/eventing/test/rekt/resources/subscription" "knative.dev/eventing/test/rekt/resources/trigger" @@ -42,7 +47,7 @@ import ( "knative.dev/pkg/ptr" "knative.dev/reconciler-test/pkg/eventshub" - eventasssert "knative.dev/reconciler-test/pkg/eventshub/assert" + eventassert "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/manifest" "knative.dev/reconciler-test/pkg/resources/service" @@ -173,7 +178,7 @@ func ManyTriggers() *feature.FeatureSet { eventshub.InputEvent(eventToSend), )) - f.Assert("source sent event", eventasssert.OnStore(source). + f.Assert("source sent event", eventassert.OnStore(source). MatchSentEvent(test.HasId(eventToSend.ID())). AtLeast(1), ) @@ -185,7 +190,7 @@ func ManyTriggers() *feature.FeatureSet { // Check on every dumper whether we should expect this event or not if eventFilter.toEventMatcher()(eventToSend) == nil { f.Assert(fmt.Sprintf("%s receive event %s", sink, eventToSend.ID()), func(ctx context.Context, t feature.T) { - eventasssert.OnStore(sink). + eventassert.OnStore(sink). Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). MatchReceivedEvent(test.HasId(eventToSend.ID())). MatchReceivedEvent(matcher). @@ -335,12 +340,12 @@ func brokerChannelFlowWithTransformation(createSubscriberFn func(ref *v1.KRefere eventshub.InputEvent(eventToSend), )) - eventMatcher := eventasssert.MatchEvent( + eventMatcher := eventassert.MatchEvent( test.HasSource(eventSource), test.HasType(eventType), test.HasData([]byte(eventBody)), ) - transformEventMatcher := eventasssert.MatchEvent( + transformEventMatcher := eventassert.MatchEvent( test.HasSource(transformedEventSource), test.HasType(transformedEventType), test.HasData([]byte(transformedBody)), @@ -348,19 +353,19 @@ func brokerChannelFlowWithTransformation(createSubscriberFn func(ref *v1.KRefere f.Stable("(Trigger1 point to) sink1 has all the events"). Must("delivers original events", - eventasssert.OnStore(sink1).Match(eventMatcher).AtLeast(1)) + eventassert.OnStore(sink1).Match(eventMatcher).AtLeast(1)) f.Stable("(Trigger2 point to) sink2 has all the events"). Must("delivers original events", - eventasssert.OnStore(sink2).Match(eventMatcher).AtLeast(1)). + eventassert.OnStore(sink2).Match(eventMatcher).AtLeast(1)). Must("delivers transformation events", - eventasssert.OnStore(sink2).Match(transformEventMatcher).AtLeast(1)) + eventassert.OnStore(sink2).Match(transformEventMatcher).AtLeast(1)) f.Stable("(Trigger3 point to) Channel's subscriber just has events after transformation"). Must("delivers transformation events", - eventasssert.OnStore(sink3).Match(transformEventMatcher).AtLeast(1)). + eventassert.OnStore(sink3).Match(transformEventMatcher).AtLeast(1)). Must("delivers original events", - eventasssert.OnStore(sink3).Match(eventMatcher).Not()) + eventassert.OnStore(sink3).Match(eventMatcher).Not()) return f } @@ -482,13 +487,13 @@ func BrokerEventTransformationForTriggerAssert(f *feature.Feature, eventshub.InputEvent(cfg.EventToSend), )) - eventMatcher := eventasssert.MatchEvent( + eventMatcher := eventassert.MatchEvent( test.HasId(cfg.EventToSend.ID()), test.HasSource(cfg.EventToSend.Source()), test.HasType(cfg.EventToSend.Type()), test.HasData(cfg.EventToSend.Data()), ) - transformEventMatcher := eventasssert.MatchEvent( + transformEventMatcher := eventassert.MatchEvent( test.HasSource(cfg.TransformedEvent.Source()), test.HasType(cfg.TransformedEvent.Type()), test.HasData(cfg.TransformedEvent.Data()), @@ -496,13 +501,13 @@ func BrokerEventTransformationForTriggerAssert(f *feature.Feature, f.Stable("Trigger has filtered all transformed events"). Must("trigger 1 delivers original events", - eventasssert.OnStore(cfg.Sink1).Match(eventMatcher).AtLeast(1)). + eventassert.OnStore(cfg.Sink1).Match(eventMatcher).AtLeast(1)). Must("trigger 1 does not deliver transformed events", - eventasssert.OnStore(cfg.Sink1).Match(transformEventMatcher).Not()). + eventassert.OnStore(cfg.Sink1).Match(transformEventMatcher).Not()). Must("trigger 2 delivers transformed events", - eventasssert.OnStore(cfg.Sink2).Match(transformEventMatcher).AtLeast(1)). + eventassert.OnStore(cfg.Sink2).Match(transformEventMatcher).AtLeast(1)). Must("trigger 2 does not deliver original events", - eventasssert.OnStore(cfg.Sink2).Match(eventMatcher).Not()) + eventassert.OnStore(cfg.Sink2).Match(eventMatcher).Not()) } func BrokerPreferHeaderCheck() *feature.Feature { @@ -544,13 +549,113 @@ func BrokerPreferHeaderCheck() *feature.Feature { f.Stable("test message without explicit prefer header should have the header"). Must("delivers events", - eventasssert.OnStore(sink).Match( - eventasssert.HasAdditionalHeader("Prefer", "reply"), + eventassert.OnStore(sink).Match( + eventassert.HasAdditionalHeader("Prefer", "reply"), ).AtLeast(1)) return f } +func PropagatesMetadata() *feature.Feature { + f := feature.NewFeatureNamed("Broker PreferHeader Check") + + if !broker.EnvCfg.IsMTChannelBasedBroker() { + f.Assert("class is not MTChannelBasedBroker, skipping", func(ctx context.Context, t feature.T) {}) + return f + } + + source := feature.MakeRandomK8sName("source") + sink := feature.MakeRandomK8sName("sink") + via := feature.MakeRandomK8sName("via") + + key := messagingv1.AsyncHandlerAnnotation + value := "false" + + event := test.FullEvent() + event.SetID(uuid.New().String()) + + //Install the broker + brokerName := feature.MakeRandomK8sName("broker") + f.Setup("install broker", broker.Install(brokerName, append(broker.WithEnvConfig(), broker.WithAnnotations( + map[string]interface{}{key: value}, + ))...)) + f.Requirement("broker is ready", broker.IsReady(brokerName)) + f.Requirement("broker is addressable", broker.IsAddressable(brokerName)) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + + // Point the Trigger subscriber to the sink svc. + cfg := []manifest.CfgFn{trigger.WithSubscriber(service.AsKReference(sink), ""), trigger.WithBrokerName(brokerName)} + + // Install the trigger + f.Setup("install trigger", trigger.Install(via, cfg...)) + f.Setup("trigger goes ready", trigger.IsReady(via)) + + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(event), + )) + + f.Assert("channel has annotations and labels", func(ctx context.Context, t feature.T) { + d := dynamicclient.Get(ctx) + channelsImpls, err := d.Resource(channel_impl.GVR()). + Namespace(environment.FromContext(ctx).Namespace()). + List(ctx, metav1.ListOptions{}) + if err != nil { + t.Errorf("Failed to list channels (%v): %v", channel_impl.GVR(), err) + return + } + + channels, err := d.Resource(channel.GVR()). + Namespace(environment.FromContext(ctx).Namespace()). + List(ctx, metav1.ListOptions{}) + if err != nil { + t.Errorf("Failed to list channels (%v): %v", channel.GVR(), err) + return + } + + channels.Items = append(channels.Items, channelsImpls.Items...) + + if len(channels.Items) <= 0 { + t.Errorf("No channels found for resources: %#v or %#v", channel_impl.GVR(), channel.GVR()) + } + + found := false + for _, ch := range channels.Items { + for _, or := range ch.GetOwnerReferences() { + if or.Kind == "Broker" && or.Name == brokerName { + v, ok := ch.GetAnnotations()[key] + if !ok { + t.Errorf("Failed to find async handler annotation:\n%#v", ch) + return + } + if v != value { + t.Errorf("Failed to find expected '%s' value for annotation '%s':\n%#v", value, key, ch) + return + } + found = true + break + } + } + } + if !found { + bytes, _ := json.MarshalIndent(channels, "", " ") + t.Errorf("No channel found associated with broker %q\n%#v", brokerName, string(bytes)) + } + }) + f.Assert("event sent", eventassert.OnStore(source). + MatchSentEvent(test.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("event received", eventassert.OnStore(sink). + MatchReceivedEvent(test.HasId(event.ID())). + AtLeast(1), + ) + + return f +} + func BrokerRedelivery() *feature.FeatureSet { fs := &feature.FeatureSet{ Name: "Knative Broker - Redelivery - with different sequences", @@ -606,9 +711,9 @@ func brokerRedeliveryFibonacci(retryNum int32) *feature.Feature { f.Stable("Broker Redelivery following the fibonacci sequence"). Must("delivers events", - eventasssert.OnStore(sink).Match( - eventasssert.MatchKind(eventasssert.EventReceived), - eventasssert.MatchEvent( + eventassert.OnStore(sink).Match( + eventassert.MatchKind(eventassert.EventReceived), + eventassert.MatchEvent( test.HasSource(eventSource), test.HasType(eventType), test.HasData([]byte(eventBody)), @@ -662,11 +767,11 @@ func brokerRedeliveryDropN(retryNum int32, dropNum uint) *feature.Feature { f.Stable("Broker Redelivery failed the first n events"). Must("delivers events", func(ctx context.Context, t feature.T) { - eventasssert.OnStore(sink). + eventassert.OnStore(sink). Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). Match( - eventasssert.MatchKind(eventasssert.EventReceived), - eventasssert.MatchEvent( + eventassert.MatchKind(eventassert.EventReceived), + eventassert.MatchEvent( test.HasSource(eventSource), test.HasType(eventType), test.HasData([]byte(eventBody)), @@ -734,7 +839,7 @@ func brokerSubscriberUnreachable() *feature.Feature { f.Assert("Receives dls extensions when subscriber is unreachable", func(ctx context.Context, t feature.T) { - eventasssert.OnStore(sink). + eventassert.OnStore(sink). Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). MatchEvent( test.HasExtension("knativeerrordest", subscriberUri), @@ -880,8 +985,8 @@ func assertEnhancedWithKnativeErrorExtensions(sinkName string, matcherfns ...fun ctx, t, 1, - eventasssert.MatchKind(eventshub.EventReceived), - eventasssert.MatchEvent(matchers...), + eventassert.MatchKind(eventshub.EventReceived), + eventassert.MatchEvent(matchers...), ) } } @@ -936,7 +1041,7 @@ func brokerSubscriberLongMessage() *feature.Feature { )) f.Assert("receive long event on sink exactly once", - eventasssert.OnStore(sink). + eventassert.OnStore(sink). MatchEvent(test.HasData([]byte(eventBody))). Exact(1), ) @@ -1021,13 +1126,13 @@ func brokerSubscriberLongResponseMessage() *feature.Feature { )) f.Assert("receive long event on sink1 exactly once", - eventasssert.OnStore(sink1). + eventassert.OnStore(sink1). MatchEvent(test.HasData([]byte(eventBody))). Exact(1), ) f.Assert("receive long event on sink2 exactly once", - eventasssert.OnStore(sink2). + eventassert.OnStore(sink2). MatchEvent(test.HasData([]byte(transformedEventBody))). Exact(1), ) diff --git a/test/rekt/features/channel/features.go b/test/rekt/features/channel/features.go index da8a6e9bf5f..22178a0720c 100644 --- a/test/rekt/features/channel/features.go +++ b/test/rekt/features/channel/features.go @@ -25,6 +25,7 @@ import ( "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/network" "knative.dev/reconciler-test/pkg/environment" @@ -37,6 +38,7 @@ import ( eventasssert "knative.dev/reconciler-test/pkg/eventshub/assert" + v1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/resources/channel" "knative.dev/eventing/test/rekt/resources/channel_impl" @@ -134,6 +136,93 @@ func DeadLetterSink(createSubscriberFn func(ref *duckv1.KReference, uri string) return f } +func AsyncHandler(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { + f := feature.NewFeature() + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + name := feature.MakeRandomK8sName("channel") + sub := feature.MakeRandomK8sName("subscription") + + event := test.FullEvent() + event.SetID(uuid.New().String()) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + f.Setup("install channel", channel_impl.Install(name, channel_impl.WithAnnotations(map[string]interface{}{ + v1.AsyncHandlerAnnotation: "true", + }))) + f.Setup("install subscription", subscription.Install(sub, + subscription.WithChannel(channel_impl.AsRef(name)), + createSubscriberFn(service.AsKReference(sink), ""), + )) + f.Setup("channel is ready", channel_impl.IsReady(name)) + f.Setup("subscription is ready", subscription.IsReady(sub)) + + f.Requirement("install source", eventshub.Install(source, eventshub.InputEvent(event), eventshub.StartSenderToResource(channel_impl.GVR(), name))) + + f.Assert("Event sent", assert.OnStore(source). + MatchSentEvent(test.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("sink receives event", assert.OnStore(sink). + MatchEvent(test.HasId(event.ID())). + AtLeast(1), + ) + + return f +} + +func AsyncHandlerUpdate(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { + f := feature.NewFeature() + sink := feature.MakeRandomK8sName("sink") + source := feature.MakeRandomK8sName("source") + name := feature.MakeRandomK8sName("channel") + sub := feature.MakeRandomK8sName("subscription") + + event := test.FullEvent() + event.SetID(uuid.New().String()) + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + f.Setup("install channel", channel_impl.Install(name, channel_impl.WithAnnotations(map[string]interface{}{ + v1.AsyncHandlerAnnotation: "true", + }))) + f.Setup("install subscription", subscription.Install(sub, + subscription.WithChannel(channel_impl.AsRef(name)), + createSubscriberFn(service.AsKReference(sink), ""), + )) + f.Setup("channel is ready", channel_impl.IsReady(name)) + f.Setup("subscription is ready", subscription.IsReady(sub)) + + f.Requirement("update channel async handler", func(ctx context.Context, t feature.T) { + dc := Client(ctx) + + imc, err := dc.ChannelImpl.Get(ctx, name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to retrieve InMemoryChannel: %v", err) + } + // swap and update it to false + imc.SetAnnotations(map[string]string{ + v1.AsyncHandlerAnnotation: "true", + }) + if _, err := dc.ChannelImpl.Update(ctx, imc, metav1.UpdateOptions{}); err != nil { + t.Fatalf("Failed to update async handler annotation: %v", err) + } + }) + + f.Requirement("channel is ready", channel_impl.IsReady(name)) + f.Requirement("install source", eventshub.Install(source, eventshub.InputEvent(event), eventshub.StartSenderToResource(channel_impl.GVR(), name))) + + f.Assert("Event sent", assert.OnStore(source). + MatchSentEvent(test.HasId(event.ID())). + AtLeast(1), + ) + f.Assert("sink receives event", assert.OnStore(sink). + MatchEvent(test.HasId(event.ID())). + AtLeast(1), + ) + + return f +} + func DeadLetterSinkGenericChannel(createSubscriberFn func(ref *duckv1.KReference, uri string) manifest.CfgFn) *feature.Feature { f := feature.NewFeature() sink := feature.MakeRandomK8sName("sink") diff --git a/test/rekt/resources/broker/broker.go b/test/rekt/resources/broker/broker.go index 81c170ad0d4..7bd1e4ce34c 100644 --- a/test/rekt/resources/broker/broker.go +++ b/test/rekt/resources/broker/broker.go @@ -51,6 +51,10 @@ type EnvConfig struct { BrokerTemplatesDir string `envconfig:"BROKER_TEMPLATES"` } +func (cfg EnvConfig) IsMTChannelBasedBroker() bool { + return cfg.BrokerClass == "" || cfg.BrokerClass == "MTChannelBasedBroker" +} + func init() { // Process EventingGlobal. if err := envconfig.Process("", &EnvCfg); err != nil { diff --git a/test/rekt/resources/broker/broker.yaml b/test/rekt/resources/broker/broker.yaml index ec73900e5bb..ba28e55715d 100644 --- a/test/rekt/resources/broker/broker.yaml +++ b/test/rekt/resources/broker/broker.yaml @@ -24,7 +24,7 @@ metadata: {{ end }} {{ if .annotations }} {{ range $key, $value := .annotations }} - {{ $key }}: {{ $value }} + {{ $key }}: "{{ $value }}" {{ end }} {{ end }} {{ end }} diff --git a/test/rekt/resources/broker/broker_test.go b/test/rekt/resources/broker/broker_test.go index 2b3b7f2e418..bc88aa4ec85 100644 --- a/test/rekt/resources/broker/broker_test.go +++ b/test/rekt/resources/broker/broker_test.go @@ -168,7 +168,7 @@ func ExampleWithAnnotations() { // name: foo // namespace: bar // annotations: - // eventing.knative.dev/foo: bar + // eventing.knative.dev/foo: "bar" // spec: } diff --git a/test/rekt/resources/channel_impl/channel_impl.go b/test/rekt/resources/channel_impl/channel_impl.go index 93d51230a6a..7766cd03f6b 100644 --- a/test/rekt/resources/channel_impl/channel_impl.go +++ b/test/rekt/resources/channel_impl/channel_impl.go @@ -173,6 +173,8 @@ func AsDestinationRef(name string) *duckv1.Destination { // WithDeadLetterSink adds the dead letter sink related config to a Subscription spec. var WithDeadLetterSink = delivery.WithDeadLetterSink +var WithAnnotations = manifest.WithAnnotations + // ValidateAddress validates the address retured by Address func ValidateAddress(name string, validate addressable.ValidateAddressFn, timings ...time.Duration) feature.StepFn { return addressable.ValidateAddress(GVR(), name, validate, timings...) diff --git a/test/rekt/resources/channel_impl/channel_impl.yaml b/test/rekt/resources/channel_impl/channel_impl.yaml index 34eb667ca97..78a042beed7 100644 --- a/test/rekt/resources/channel_impl/channel_impl.yaml +++ b/test/rekt/resources/channel_impl/channel_impl.yaml @@ -17,6 +17,12 @@ kind: {{ .kind }} metadata: name: {{ .name }} namespace: {{ .namespace }} + {{ if .annotations }} + annotations: + {{ range $key, $value := .annotations }} + {{ $key }}: "{{ $value }}" + {{ end }} + {{ end }} spec: {{ if .delivery }} delivery: