From d8a2ad5683dfda005cf301a4fde7b1200c37e947 Mon Sep 17 00:00:00 2001 From: Iqbal H <52209787+Iiqbal2000@users.noreply.github.com> Date: Mon, 21 Aug 2023 18:28:21 +0700 Subject: [PATCH 1/7] change the deprecated string (#7179) * change the deprecated string * Apply suggestions from code review Co-authored-by: Calum Murray --------- Co-authored-by: Calum Murray --- test/rekt/features/apiserversource/data_plane.go | 2 +- test/rekt/features/pingsource/features.go | 2 +- test/rekt/resources/eventtype/eventtype.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/rekt/features/apiserversource/data_plane.go b/test/rekt/features/apiserversource/data_plane.go index 1adfef8c67b..a00e47693cf 100644 --- a/test/rekt/features/apiserversource/data_plane.go +++ b/test/rekt/features/apiserversource/data_plane.go @@ -256,7 +256,7 @@ func SendsEventsWithEventTypes() *feature.Feature { }) f.Requirement("ApiServerSource goes ready", apiserversource.IsReady(source)) - expectedCeTypes := sets.NewString(sources.ApiServerSourceEventReferenceModeTypes...) + expectedCeTypes := sets.New(sources.ApiServerSourceEventReferenceModeTypes...) f.Stable("ApiServerSource as event source"). Must("delivers events on broker with URI", diff --git a/test/rekt/features/pingsource/features.go b/test/rekt/features/pingsource/features.go index 33cbcc279e3..73784862f8b 100644 --- a/test/rekt/features/pingsource/features.go +++ b/test/rekt/features/pingsource/features.go @@ -160,7 +160,7 @@ func SendsEventsWithEventTypes() *feature.Feature { }) f.Requirement("PingSource goes ready", pingsource.IsReady(source)) - expectedCeTypes := sets.NewString(sourcesv1.PingSourceEventType) + expectedCeTypes := sets.New(sourcesv1.PingSourceEventType) f.Stable("pingsource as event source"). Must("delivers events on broker with URI", assert.OnStore(sink).MatchEvent( diff --git a/test/rekt/resources/eventtype/eventtype.go b/test/rekt/resources/eventtype/eventtype.go index 88c641fb74c..f8156acc562 100644 --- a/test/rekt/resources/eventtype/eventtype.go +++ b/test/rekt/resources/eventtype/eventtype.go @@ -60,7 +60,7 @@ func WaitForEventType(eventtype EventType, timing ...time.Duration) feature.Step } } -func AssertPresent(expectedCeTypes sets.String) EventType { +func AssertPresent(expectedCeTypes sets.Set[string]) EventType { return EventType{ Name: "test eventtypes match or not", EventTypes: func(etl eventingv1beta2.EventTypeList) (bool, error) { From a2e2aa3d515db8dd526fcfd0035fe352a31e40e1 Mon Sep 17 00:00:00 2001 From: Tania Duggal <103496926+taniaduggal@users.noreply.github.com> Date: Tue, 22 Aug 2023 19:12:55 +0530 Subject: [PATCH 2/7] Eventing TLS: Test SinkBinding with Broker as sink (#7172) * tlssinkbindingbrokerassink * Update test/rekt/features/sinkbinding/feature.go Co-authored-by: Calum Murray * Update test/rekt/features/sinkbinding/feature.go Co-authored-by: Calum Murray * Update test/rekt/features/sinkbinding/feature.go Co-authored-by: Calum Murray * Update test/rekt/features/sinkbinding/feature.go Co-authored-by: Calum Murray * Update feature.go * Pass context to `SendsEventsWithBrokerAsSinkTLS` function Co-authored-by: Calum Murray --------- Co-authored-by: Calum Murray Co-authored-by: Pierangelo Di Pilato --- test/rekt/features/sinkbinding/feature.go | 51 +++++++++++++++++++ .../rekt/resources/sinkbinding/sinkbinding.go | 15 ++++++ test/rekt/sink_binding_test.go | 15 ++++++ 3 files changed, 81 insertions(+) diff --git a/test/rekt/features/sinkbinding/feature.go b/test/rekt/features/sinkbinding/feature.go index 5b046b7c6eb..205f6793b25 100644 --- a/test/rekt/features/sinkbinding/feature.go +++ b/test/rekt/features/sinkbinding/feature.go @@ -22,6 +22,8 @@ import ( "github.com/cloudevents/sdk-go/v2/test" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" + "knative.dev/eventing/test/rekt/resources/addressable" + "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/pkg/tracker" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" @@ -76,6 +78,55 @@ func SinkBindingV1Deployment(ctx context.Context) *feature.Feature { return f } +func SendsEventsWithBrokerAsSinkTLS(ctx context.Context) *feature.Feature { + sbinding := feature.MakeRandomK8sName("sinkbinding") + brokerName := feature.MakeRandomK8sName("broker") + sinkName := feature.MakeRandomK8sName("sink") + subject := feature.MakeRandomK8sName("subject") + extensionSecret := string(uuid.NewUUID()) + + f := feature.NewFeatureNamed("SinkBinding V1 Deployment BrokerAsSink test") + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...)) + f.Setup("broker is ready", broker.IsReady(brokerName)) + f.Setup("broker is addressable", broker.IsAddressable(brokerName)) + f.Setup("Broker has HTTPS address", broker.ValidateAddress(brokerName, addressable.AssertHTTPSAddress)) + env := environment.FromContext(ctx) + f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiverTLS)) + f.Setup("install a deployment", deployment.Install(subject, heartbeatsImage, + deployment.WithEnvs(map[string]string{ + "POD_NAME": "heartbeats", + "POD_NAMESPACE": env.Namespace(), + }))) + + extensions := map[string]string{ + "sinkbinding": extensionSecret, + } + + cfg := []manifest.CfgFn{ + sinkbinding.WithExtensions(extensions), + } + + f.Requirement("install SinkBinding", func(ctx context.Context, t feature.T) { + d := service.AsDestinationRef(sinkName) + d.CACerts = eventshub.GetCaCerts(ctx) + sinkbinding.Install(sbinding, d, deployment.AsTrackerReference(subject), cfg...)(ctx, t) + }) + f.Requirement("SinkBinding goes ready", sinkbinding.IsReady(sbinding)) + + f.Stable("Create a deployment as sinkbinding's subject"). + Must("delivers events", + eventasssert.OnStore(sinkName). + Match(eventasssert.MatchKind(eventshub.EventReceived)). + MatchEvent(test.HasExtension("sinkbinding", extensionSecret)). + AtLeast(1), + ) + + return f +} + func SinkBindingV1Job(ctx context.Context) *feature.Feature { sbinding := feature.MakeRandomK8sName("sinkbinding") sink := feature.MakeRandomK8sName("sink") diff --git a/test/rekt/resources/sinkbinding/sinkbinding.go b/test/rekt/resources/sinkbinding/sinkbinding.go index a0be6655feb..fe143f66559 100644 --- a/test/rekt/resources/sinkbinding/sinkbinding.go +++ b/test/rekt/resources/sinkbinding/sinkbinding.go @@ -126,3 +126,18 @@ func WithSink(d *duckv1.Destination) manifest.CfgFn { func IsReady(name string, timing ...time.Duration) feature.StepFn { return k8s.IsReady(Gvr(), name, timing...) } + +func AsDestinationRef(name string) *duckv1.Destination { + return &duckv1.Destination{ + Ref: AsKReference(name), + } +} + +// AsKReference returns a KReference for a Broker without namespace. +func AsKReference(name string) *duckv1.KReference { + return &duckv1.KReference{ + Kind: "Broker", + Name: name, + APIVersion: "eventing.knative.dev/v1", + } +} diff --git a/test/rekt/sink_binding_test.go b/test/rekt/sink_binding_test.go index 37d8964ca45..d9e3b6ed024 100644 --- a/test/rekt/sink_binding_test.go +++ b/test/rekt/sink_binding_test.go @@ -61,6 +61,21 @@ func TestSinkBindingV1DeploymentTLS(t *testing.T) { env.Test(ctx, t, sinkbinding.SinkBindingV1DeploymentTLS(ctx)) } +func TestSinkBindingV1Deployment_BrokerAsSinkTLS(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + eventshub.WithTLS(t), + ) + + env.Test(ctx, t, sinkbinding.SendsEventsWithBrokerAsSinkTLS(ctx)) +} + func TestSinkBindingV1Job(t *testing.T) { t.Parallel() From 4d8da11047f7a086341810941dac264ce09d1e1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 24 Aug 2023 09:25:56 +0200 Subject: [PATCH 3/7] Upgrade to latest dependencies (#7190) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Update dependencies * Update patch output after gomodules.xyz/jsonpatch/v2 update Signed-off-by: Christoph Stäbler * Replace one subscriber to have a predictable patch --------- Signed-off-by: Christoph Stäbler --- go.mod | 10 +- go.sum | 21 ++- .../dispatcher/inmemorychannel_test.go | 16 +- .../gomodules.xyz/jsonpatch/v2/jsonpatch.go | 160 ++++-------------- .../reconciler-test/pkg/manifest/options.go | 1 + vendor/modules.txt | 12 +- 6 files changed, 64 insertions(+), 156 deletions(-) diff --git a/go.mod b/go.mod index ecb89ed032d..18cd4a862c3 100644 --- a/go.mod +++ b/go.mod @@ -44,10 +44,10 @@ require ( k8s.io/apiserver v0.26.5 k8s.io/client-go v0.26.5 k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 - knative.dev/hack v0.0.0-20230814132844-3403e3502fdc - knative.dev/hack/schema v0.0.0-20230814132844-3403e3502fdc - knative.dev/pkg v0.0.0-20230814093643-26b41ba523a0 - knative.dev/reconciler-test v0.0.0-20230810072538-a7237b013cbb + knative.dev/hack v0.0.0-20230815012940-044c02b7a447 + knative.dev/hack/schema v0.0.0-20230815012940-044c02b7a447 + knative.dev/pkg v0.0.0-20230815132840-4f651e092853 + knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674 sigs.k8s.io/yaml v1.3.0 ) @@ -120,7 +120,7 @@ require ( golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.12.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect + gomodules.xyz/jsonpatch/v2 v2.3.0 // indirect google.golang.org/api v0.136.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect diff --git a/go.sum b/go.sum index e9185d43c24..a828b45f18d 100644 --- a/go.sum +++ b/go.sum @@ -122,7 +122,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= @@ -709,8 +708,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY= -gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY= +gomodules.xyz/jsonpatch/v2 v2.3.0 h1:8NFhfS6gzxNqjLIYnZxg319wZ5Qjnx4m/CcX+Klzazc= +gomodules.xyz/jsonpatch/v2 v2.3.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk3VlUWtTg5huQpZR9flmE= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= @@ -871,14 +870,14 @@ k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+O k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 h1:GfD9OzL11kvZN5iArC6oTS7RTj7oJOIfnislxYlqTj8= k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -knative.dev/hack v0.0.0-20230814132844-3403e3502fdc h1:sIJ2MfOFVb3+EIK+n5B7l5GeZxTrHegLzdBTYjreCCU= -knative.dev/hack v0.0.0-20230814132844-3403e3502fdc/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q= -knative.dev/hack/schema v0.0.0-20230814132844-3403e3502fdc h1:ZVpJcVifq3TV/0yXaRkCBAMwCxQSVnhg/9N8cV2dseU= -knative.dev/hack/schema v0.0.0-20230814132844-3403e3502fdc/go.mod h1:GeIb+PLd5mllawcpHEGF5J5fYTQrvgEO5liao8lUKUs= -knative.dev/pkg v0.0.0-20230814093643-26b41ba523a0 h1:fqLbNL2/g016cnF3IQRjFr2aLmhcAnHiA1CANoBesOA= -knative.dev/pkg v0.0.0-20230814093643-26b41ba523a0/go.mod h1:D9D8LkOtlJBC09i+6nbTdZ5LAKsAbTJkkGGXEh2BPYI= -knative.dev/reconciler-test v0.0.0-20230810072538-a7237b013cbb h1:XKIYY3OOzlzuN1hX3jc3UoJyIXe33bghVHyyhIpkr2A= -knative.dev/reconciler-test v0.0.0-20230810072538-a7237b013cbb/go.mod h1:i+/PWK/n3HPgjXMoj5U7CA6WRW/C3c3EfHCQ0FmrhNM= +knative.dev/hack v0.0.0-20230815012940-044c02b7a447 h1:Lr4O/WEyZHuUBFbqATYdQlfLXvhPUCluF4zlgRi59T4= +knative.dev/hack v0.0.0-20230815012940-044c02b7a447/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q= +knative.dev/hack/schema v0.0.0-20230815012940-044c02b7a447 h1:/Q72vaUeJsQ3cVNddEYA8aqb++E2woTDEd8DrmcUsnM= +knative.dev/hack/schema v0.0.0-20230815012940-044c02b7a447/go.mod h1:GeIb+PLd5mllawcpHEGF5J5fYTQrvgEO5liao8lUKUs= +knative.dev/pkg v0.0.0-20230815132840-4f651e092853 h1:OyAYpXLa/jQWClFxRegCccGySyX2/1BVRtKaAWRE/xM= +knative.dev/pkg v0.0.0-20230815132840-4f651e092853/go.mod h1:Y5Tis5nMoapB9iTZywW20Qnv/7LBahrtHz9Sm6+l3LE= +knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674 h1:xgUvk/bKVq0wDgahE/wxmg3sD6j2mjCAimJGtxaQeiY= +knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674/go.mod h1:i+/PWK/n3HPgjXMoj5U7CA6WRW/C3c3EfHCQ0FmrhNM= pgregory.net/rapid v1.0.0 h1:iQaM2w5PZ6xvt6x7hbd7tiDS+nk7YPp5uCaEba+T/F4= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index 8eb94926489..a1fda055b0d 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -50,11 +50,11 @@ import ( ) const ( - testNS = "test-namespace" - imcName = "test-imc" - twoSubscriberPatch = `[{"op":"add","path":"/status/subscribers","value":[{"observedGeneration":1,"ready":"True","uid":"2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1"},{"observedGeneration":2,"ready":"True","uid":"34c5aec8-deb6-11e8-9f32-f2801f1b9fd1"}]}]` - oneSubscriberPatch = `[{"op":"add","path":"/status/subscribers","value":[{"observedGeneration":1,"ready":"True","uid":"2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1"}]}]` - oneSubscriberRemovedOneAddedPatch = `[{"op":"add","path":"/status/subscribers/2","value":{"observedGeneration":2,"ready":"True","uid":"34c5aec8-deb6-11e8-9f32-f2801f1b9fd1"}},{"op":"remove","path":"/status/subscribers/0"}]` + testNS = "test-namespace" + imcName = "test-imc" + twoSubscriberPatch = `[{"op":"add","path":"/status/subscribers","value":[{"observedGeneration":1,"ready":"True","uid":"2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1"},{"observedGeneration":2,"ready":"True","uid":"34c5aec8-deb6-11e8-9f32-f2801f1b9fd1"}]}]` + oneSubscriberPatch = `[{"op":"add","path":"/status/subscribers","value":[{"observedGeneration":1,"ready":"True","uid":"2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1"}]}]` + oneSubscriberReplaced = `[{"op":"replace","path":"/status/subscribers/1/uid","value":"34c5aec8-deb6-11e8-9f32-f2801f1b9fd1"}]` ) var ( @@ -191,7 +191,7 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress)), }, }, { - Name: "with subscribers, one removed one added to status", + Name: "with subscribers, one replaced to status", Key: imcKey, Objects: []runtime.Object{ NewInMemoryChannel(imcName, testNS, @@ -201,12 +201,12 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelEndpointsReady(), WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelSubscribers(subscribers), - WithInMemoryChannelReadySubscriberAndGeneration(string(subscriber3UID), subscriber3Generation), WithInMemoryChannelReadySubscriberAndGeneration(string(subscriber1UID), subscriber1Generation), + WithInMemoryChannelReadySubscriberAndGeneration(string(subscriber3UID), subscriber3Generation), WithInMemoryChannelAddress(channelServiceAddress)), }, WantPatches: []clientgotesting.PatchActionImpl{ - makePatch(testNS, imcName, oneSubscriberRemovedOneAddedPatch), + makePatch(testNS, imcName, oneSubscriberReplaced), }, }, { Name: "subscriber with delivery spec", diff --git a/vendor/gomodules.xyz/jsonpatch/v2/jsonpatch.go b/vendor/gomodules.xyz/jsonpatch/v2/jsonpatch.go index 0ffb3156042..a411d542c68 100644 --- a/vendor/gomodules.xyz/jsonpatch/v2/jsonpatch.go +++ b/vendor/gomodules.xyz/jsonpatch/v2/jsonpatch.go @@ -1,7 +1,6 @@ package jsonpatch import ( - "bytes" "encoding/json" "fmt" "reflect" @@ -24,21 +23,28 @@ func (j *Operation) Json() string { } func (j *Operation) MarshalJSON() ([]byte, error) { - var b bytes.Buffer - b.WriteString("{") - b.WriteString(fmt.Sprintf(`"op":"%s"`, j.Operation)) - b.WriteString(fmt.Sprintf(`,"path":"%s"`, j.Path)) - // Consider omitting Value for non-nullable operations. - if j.Value != nil || j.Operation == "replace" || j.Operation == "add" { - v, err := json.Marshal(j.Value) - if err != nil { - return nil, err - } - b.WriteString(`,"value":`) - b.Write(v) - } - b.WriteString("}") - return b.Bytes(), nil + // Ensure for add and replace we emit `value: null` + if j.Value == nil && (j.Operation == "replace" || j.Operation == "add") { + return json.Marshal(struct { + Operation string `json:"op"` + Path string `json:"path"` + Value interface{} `json:"value"` + }{ + Operation: j.Operation, + Path: j.Path, + }) + } + // otherwise just marshal normally. We cannot literally do json.Marshal(j) as it would be recursively + // calling this function. + return json.Marshal(struct { + Operation string `json:"op"` + Path string `json:"path"` + Value interface{} `json:"value,omitempty"` + }{ + Operation: j.Operation, + Path: j.Path, + Value: j.Value, + }) } type ByPath []Operation @@ -149,9 +155,6 @@ func makePath(path string, newPart interface{}) string { if path == "" { return "/" + key } - if strings.HasSuffix(path, "/") { - return path + key - } return path + "/" + key } @@ -211,22 +214,18 @@ func handleValues(av, bv interface{}, p string, patch []Operation) ([]Operation, } case []interface{}: bt := bv.([]interface{}) - if isSimpleArray(at) && isSimpleArray(bt) { - patch = append(patch, compareEditDistance(at, bt, p)...) - } else { - n := min(len(at), len(bt)) - for i := len(at) - 1; i >= n; i-- { - patch = append(patch, NewOperation("remove", makePath(p, i), nil)) - } - for i := n; i < len(bt); i++ { - patch = append(patch, NewOperation("add", makePath(p, i), bt[i])) - } - for i := 0; i < n; i++ { - var err error - patch, err = handleValues(at[i], bt[i], makePath(p, i), patch) - if err != nil { - return nil, err - } + n := min(len(at), len(bt)) + for i := len(at) - 1; i >= n; i-- { + patch = append(patch, NewOperation("remove", makePath(p, i), nil)) + } + for i := n; i < len(bt); i++ { + patch = append(patch, NewOperation("add", makePath(p, i), bt[i])) + } + for i := 0; i < n; i++ { + var err error + patch, err = handleValues(at[i], bt[i], makePath(p, i), patch) + if err != nil { + return nil, err } } default: @@ -235,100 +234,9 @@ func handleValues(av, bv interface{}, p string, patch []Operation) ([]Operation, return patch, nil } -func isBasicType(a interface{}) bool { - switch a.(type) { - case string, float64, bool: - default: - return false - } - return true -} - -func isSimpleArray(a []interface{}) bool { - for i := range a { - switch a[i].(type) { - case string, float64, bool: - default: - val := reflect.ValueOf(a[i]) - if val.Kind() == reflect.Map { - for _, k := range val.MapKeys() { - av := val.MapIndex(k) - if av.Kind() == reflect.Ptr || av.Kind() == reflect.Interface { - if av.IsNil() { - continue - } - av = av.Elem() - } - if av.Kind() != reflect.String && av.Kind() != reflect.Float64 && av.Kind() != reflect.Bool { - return false - } - } - return true - } - return false - } - } - return true -} - -// https://en.wikipedia.org/wiki/Wagner%E2%80%93Fischer_algorithm -// Adapted from https://github.com/texttheater/golang-levenshtein -func compareEditDistance(s, t []interface{}, p string) []Operation { - m := len(s) - n := len(t) - - d := make([][]int, m+1) - for i := 0; i <= m; i++ { - d[i] = make([]int, n+1) - d[i][0] = i - } - for j := 0; j <= n; j++ { - d[0][j] = j - } - - for j := 1; j <= n; j++ { - for i := 1; i <= m; i++ { - if reflect.DeepEqual(s[i-1], t[j-1]) { - d[i][j] = d[i-1][j-1] // no op required - } else { - del := d[i-1][j] + 1 - add := d[i][j-1] + 1 - rep := d[i-1][j-1] + 1 - d[i][j] = min(rep, min(add, del)) - } - } - } - - return backtrace(s, t, p, m, n, d) -} - func min(x int, y int) int { if y < x { return y } return x } - -func backtrace(s, t []interface{}, p string, i int, j int, matrix [][]int) []Operation { - if i > 0 && matrix[i-1][j]+1 == matrix[i][j] { - op := NewOperation("remove", makePath(p, i-1), nil) - return append([]Operation{op}, backtrace(s, t, p, i-1, j, matrix)...) - } - if j > 0 && matrix[i][j-1]+1 == matrix[i][j] { - op := NewOperation("add", makePath(p, i), t[j-1]) - return append([]Operation{op}, backtrace(s, t, p, i, j-1, matrix)...) - } - if i > 0 && j > 0 && matrix[i-1][j-1]+1 == matrix[i][j] { - if isBasicType(s[0]) { - op := NewOperation("replace", makePath(p, i-1), t[j-1]) - return append([]Operation{op}, backtrace(s, t, p, i-1, j-1, matrix)...) - } - - p2, _ := handleValues(s[i-1], t[j-1], makePath(p, i-1), []Operation{}) - return append(p2, backtrace(s, t, p, i-1, j-1, matrix)...) - } - if i > 0 && j > 0 && matrix[i-1][j-1] == matrix[i][j] { - return backtrace(s, t, p, i-1, j-1, matrix) - } - return []Operation{} -} diff --git a/vendor/knative.dev/reconciler-test/pkg/manifest/options.go b/vendor/knative.dev/reconciler-test/pkg/manifest/options.go index 94855ddbd6d..bb51fb58a3a 100644 --- a/vendor/knative.dev/reconciler-test/pkg/manifest/options.go +++ b/vendor/knative.dev/reconciler-test/pkg/manifest/options.go @@ -86,6 +86,7 @@ func WithIstioPodAnnotations(cfg map[string]interface{}) { podAnnotations := map[string]interface{}{ "sidecar.istio.io/inject": "true", "sidecar.istio.io/rewriteAppHTTPProbers": "true", + "proxy.istio.io/config": "{ 'holdApplicationUntilProxyStarts': true }", } WithAnnotations(podAnnotations)(cfg) diff --git a/vendor/modules.txt b/vendor/modules.txt index 46f19670b07..812391e586f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -498,8 +498,8 @@ golang.org/x/tools/internal/typeparams ## explicit; go 1.17 golang.org/x/xerrors golang.org/x/xerrors/internal -# gomodules.xyz/jsonpatch/v2 v2.2.0 -## explicit; go 1.12 +# gomodules.xyz/jsonpatch/v2 v2.3.0 +## explicit; go 1.20 gomodules.xyz/jsonpatch/v2 # google.golang.org/api v0.136.0 ## explicit; go 1.19 @@ -1177,17 +1177,17 @@ k8s.io/utils/net k8s.io/utils/pointer k8s.io/utils/strings/slices k8s.io/utils/trace -# knative.dev/hack v0.0.0-20230814132844-3403e3502fdc +# knative.dev/hack v0.0.0-20230815012940-044c02b7a447 ## explicit; go 1.18 knative.dev/hack knative.dev/hack/shell -# knative.dev/hack/schema v0.0.0-20230814132844-3403e3502fdc +# knative.dev/hack/schema v0.0.0-20230815012940-044c02b7a447 ## explicit; go 1.18 knative.dev/hack/schema/commands knative.dev/hack/schema/docs knative.dev/hack/schema/registry knative.dev/hack/schema/schema -# knative.dev/pkg v0.0.0-20230814093643-26b41ba523a0 +# knative.dev/pkg v0.0.0-20230815132840-4f651e092853 ## explicit; go 1.18 knative.dev/pkg/apiextensions/storageversion knative.dev/pkg/apiextensions/storageversion/cmd/migrate @@ -1325,7 +1325,7 @@ knative.dev/pkg/webhook/resourcesemantics knative.dev/pkg/webhook/resourcesemantics/conversion knative.dev/pkg/webhook/resourcesemantics/defaulting knative.dev/pkg/webhook/resourcesemantics/validation -# knative.dev/reconciler-test v0.0.0-20230810072538-a7237b013cbb +# knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674 ## explicit; go 1.18 knative.dev/reconciler-test/cmd/eventshub knative.dev/reconciler-test/pkg/environment From 77497715f9ee42fe683c81c5ff092b99d0317e19 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 24 Aug 2023 06:20:57 -0400 Subject: [PATCH 4/7] Added benchmark for any filter (#7195) Signed-off-by: Calum Murray --- .../benchmarks/any_filter_benchmark_test.go | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 pkg/eventfilter/benchmarks/any_filter_benchmark_test.go diff --git a/pkg/eventfilter/benchmarks/any_filter_benchmark_test.go b/pkg/eventfilter/benchmarks/any_filter_benchmark_test.go new file mode 100644 index 00000000000..8386eabceba --- /dev/null +++ b/pkg/eventfilter/benchmarks/any_filter_benchmark_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package benchmarks + +import ( + "testing" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "knative.dev/eventing/pkg/eventfilter" + "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" +) + +func BenchmarkAnyFilter(b *testing.B) { + // Full event with all possible fields filled + event := cetest.FullEvent() + + filter, _ := subscriptionsapi.NewExactFilter(map[string]string{"id": event.ID()}) + prefixFilter, _ := subscriptionsapi.NewPrefixFilter(map[string]string{"type": event.Type()[0:5]}) + suffixFilter, _ := subscriptionsapi.NewSuffixFilter(map[string]string{"source": event.Source()[len(event.Source())-5:]}) + prefixFilterNoMatch, _ := subscriptionsapi.NewPrefixFilter(map[string]string{"type": "qwertyuiop"}) + suffixFilterNoMatch, _ := subscriptionsapi.NewSuffixFilter(map[string]string{"source": "qwertyuiop"}) + + RunFilterBenchmarks(b, + func(i interface{}) eventfilter.Filter { + filters := i.([]eventfilter.Filter) + return subscriptionsapi.NewAnyFilter(filters...) + }, + FilterBenchmark{ + name: "Any filter with exact filter test", + arg: []eventfilter.Filter{filter}, + event: event, + }, + FilterBenchmark{ + name: "Any filter match all subfilters", + arg: []eventfilter.Filter{filter, prefixFilter, suffixFilter}, + event: event, + }, + FilterBenchmark{ + name: "Any filter no 1 match at end of array", + arg: []eventfilter.Filter{prefixFilterNoMatch, suffixFilterNoMatch, filter}, + event: event, + }, + FilterBenchmark{ + name: "Any filter no 1 match at start of array", + arg: []eventfilter.Filter{filter, prefixFilterNoMatch, suffixFilterNoMatch}, + event: event, + }, + ) +} From 0045fa97dd54be05f228c33354c9b018f03e8317 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 24 Aug 2023 07:21:56 -0400 Subject: [PATCH 5/7] Broker eventtype autocreate fixes (#7161) * Fixed undefined typemeta on broker in eventtype autocreate Signed-off-by: Calum Murray * Fixed autocreate so that only one eventtype is created when events are sent to mt channel broker Signed-off-by: Calum Murray * Clean up Signed-off-by: Calum Murray * Fixed unit tests Signed-off-by: Calum Murray * channel only creates eventtypes if not owned by broker Signed-off-by: Calum Murray --------- Signed-off-by: Calum Murray --- cmd/broker/ingress/main.go | 18 ++++++++++++++++-- pkg/broker/ingress/ingress_handler.go | 9 ++++++++- .../dispatcher/inmemorychannel.go | 15 ++++++++++----- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 67817f4eafc..bd8376dbe67 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -131,12 +131,26 @@ func main() { logger.Fatal("Error setting up trace publishing", zap.Error(err)) } - featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + var featureStore *feature.Store + var handler *ingress.Handler + + featureStore = feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + featureFlags := value.(feature.Flags) + if featureFlags.IsEnabled(feature.EvenTypeAutoCreate) && featureStore != nil && handler != nil { + autoCreate := &eventtype.EventTypeAutoHandler{ + EventTypeLister: eventtypeinformer.Get(ctx).Lister(), + EventingClient: eventingclient.Get(ctx).EventingV1beta2(), + FeatureStore: featureStore, + Logger: logger, + } + handler.EvenTypeHandler = autoCreate + } + }) featureStore.WatchConfigs(configMapWatcher) reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) - handler, err := ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer) + handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) } diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index ba9ee64cc84..7c4fbeb5795 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -247,13 +247,20 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } func toKReference(broker *eventingv1.Broker) *duckv1.KReference { - return &duckv1.KReference{ + kref := &duckv1.KReference{ Kind: broker.Kind, Namespace: broker.Namespace, Name: broker.Name, APIVersion: broker.APIVersion, Address: broker.Status.Address.Name, } + if kref.Kind == "" { + kref.Kind = "Broker" + } + if kref.APIVersion == "" { + kref.APIVersion = "eventing.knative.dev/v1" + } + return kref } func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloudevents.Event, brokerNamespace, brokerName string) (int, time.Duration) { diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 05e22d5a998..14e157db781 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -93,17 +93,22 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec return err } var eventTypeAutoHandler *eventtype.EventTypeAutoHandler - var channelReference *duckv1.KReference + var channelRef *duckv1.KReference var UID *types.UID - if r.featureStore.IsEnabled(feature.EvenTypeAutoCreate) { + if ownerReferences := imc.GetOwnerReferences(); r.featureStore.IsEnabled(feature.EvenTypeAutoCreate) && + (len(ownerReferences) == 0 || + ownerReferences[0].Kind != "Broker") { + logging.FromContext(ctx).Info("EventType autocreate is enabled, creating handler") eventTypeAutoHandler = &eventtype.EventTypeAutoHandler{ EventTypeLister: r.eventTypeLister, EventingClient: r.eventingClient, FeatureStore: r.featureStore, Logger: logging.FromContext(ctx).Desugar(), } - channelReference = toKReference(imc) + + channelRef = toKReference(imc) UID = &imc.UID + } // First grab the host based MultiChannelFanoutMessage httpHandler @@ -115,7 +120,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec config.FanoutConfig, r.reporter, eventTypeAutoHandler, - channelReference, + channelRef, UID, ) if err != nil { @@ -143,7 +148,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec config.FanoutConfig, r.reporter, eventTypeAutoHandler, - channelReference, + channelRef, UID, channel.ResolveChannelFromPath(channel.ParseChannelFromPath), ) From 11f1ee4ef821c70cf4098b51441bf8288efa615b Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 24 Aug 2023 07:22:03 -0400 Subject: [PATCH 6/7] Added rekt test for channel event autocreate (#7198) Signed-off-by: Calum Murray --- test/experimental/config/features.yaml | 1 + .../experimental/eventtype_autocreate_test.go | 42 +++++++++++++ .../features/eventtype_autocreate/features.go | 61 +++++++++++++++++++ 3 files changed, 104 insertions(+) create mode 100644 test/experimental/eventtype_autocreate_test.go create mode 100644 test/experimental/features/eventtype_autocreate/features.go diff --git a/test/experimental/config/features.yaml b/test/experimental/config/features.yaml index 01955b023dc..8968e320835 100644 --- a/test/experimental/config/features.yaml +++ b/test/experimental/config/features.yaml @@ -25,3 +25,4 @@ data: delivery-retryafter: "enabled" delivery-timeout: "enabled" new-trigger-filters: "enabled" + eventtype-auto-create: "enabled" diff --git a/test/experimental/eventtype_autocreate_test.go b/test/experimental/eventtype_autocreate_test.go new file mode 100644 index 00000000000..b64404bcc80 --- /dev/null +++ b/test/experimental/eventtype_autocreate_test.go @@ -0,0 +1,42 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2023 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package experimental + +import ( + "testing" + + "knative.dev/eventing/test/experimental/features/eventtype_autocreate" + + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" +) + +func TestIMCEventTypeAutoCreate(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.Test(ctx, t, eventtype_autocreate.AutoCreateEventTypesOnIMC()) +} diff --git a/test/experimental/features/eventtype_autocreate/features.go b/test/experimental/features/eventtype_autocreate/features.go new file mode 100644 index 00000000000..a2248e30ab2 --- /dev/null +++ b/test/experimental/features/eventtype_autocreate/features.go @@ -0,0 +1,61 @@ +/* +Copyright 2023 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventtype_autocreate + +import ( + cetest "github.com/cloudevents/sdk-go/v2/test" + "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/eventing/test/rekt/resources/channel_impl" + "knative.dev/eventing/test/rekt/resources/eventtype" + "knative.dev/eventing/test/rekt/resources/subscription" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/resources/service" +) + +func AutoCreateEventTypesOnIMC() *feature.Feature { + f := feature.NewFeature() + + event := cetest.FullEvent() + event.SetType("test.custom.event.type") + + sender := feature.MakeRandomK8sName("sender") + sub := feature.MakeRandomK8sName("subscription") + channelName := feature.MakeRandomK8sName("channel") + sink := feature.MakeRandomK8sName("sink") + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + f.Setup("install channel", channel_impl.Install(channelName)) + f.Setup("install subscription", subscription.Install(sub, + subscription.WithChannel(channel_impl.AsRef(channelName)), + subscription.WithSubscriber(service.AsKReference(sink), ""), + )) + + f.Setup("subscription is ready", subscription.IsReady(sub)) + f.Setup("channel is ready", channel_impl.IsReady(channelName)) + + f.Requirement("install event sender", eventshub.Install(sender, + eventshub.StartSenderToResource(channel_impl.GVR(), channelName), + eventshub.InputEvent(event), + )) + + expectedTypes := sets.New(event.Type()) + + f.Alpha("imc"). + Must("deliver events to subscriber", assert.OnStore(sink).MatchEvent(cetest.HasId(event.ID())).AtLeast(1)). + Must("create event type", eventtype.WaitForEventType(eventtype.AssertPresent(expectedTypes))) + + return f +} From 08cf00f0389dc55444db31a067d5a1a1270c0d19 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 24 Aug 2023 08:21:57 -0400 Subject: [PATCH 7/7] Add rekt test for "Any" filter (#7130) * Added utilities for creating filters in rekt tests Signed-off-by: Calum Murray * Added reconciler test for 'Any' filter Signed-off-by: Calum Murray * Refactored bulk of testing setup into separate reusable function Signed-off-by: Calum Murray * separated into different files Signed-off-by: Calum Murray * fixed cesql syntax Signed-off-by: Calum Murray * hopefully fix license check Signed-off-by: Calum Murray --------- Signed-off-by: Calum Murray --- .../features/new_trigger_filters/feature.go | 204 ++++++++++++++++++ .../features/new_trigger_filters/filters.go | 134 +++++------- test/experimental/new_trigger_filters_test.go | 16 ++ 3 files changed, 270 insertions(+), 84 deletions(-) create mode 100644 test/experimental/features/new_trigger_filters/feature.go diff --git a/test/experimental/features/new_trigger_filters/feature.go b/test/experimental/features/new_trigger_filters/feature.go new file mode 100644 index 00000000000..ccb4958a11b --- /dev/null +++ b/test/experimental/features/new_trigger_filters/feature.go @@ -0,0 +1,204 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package new_trigger_filters + +import ( + "fmt" + + . "github.com/cloudevents/sdk-go/v2/test" + "knative.dev/reconciler-test/pkg/eventshub" + . "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/manifest" + "knative.dev/reconciler-test/pkg/resources/service" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/eventing/test/rekt/resources/trigger" +) + +// FiltersFeatureSet creates a feature set for testing the broker implementation of the new trigger filters experimental feature +// (aka Cloud Events Subscriptions API filters). It requires a created and ready Broker resource with brokerName. +// +// The feature set tests four filter dialects: exact, prefix, suffix and cesql (aka CloudEvents SQL). +func FiltersFeatureSet(brokerName string) *feature.FeatureSet { + matchedEvent := FullEvent() + unmatchedEvent := MinEvent() + unmatchedEvent.SetType("org.wrong.type") + unmatchedEvent.SetSource("org.wrong.source") + + features := make([]*feature.Feature, 0, 8) + tests := map[string]struct { + filters []eventingv1.SubscriptionsAPIFilter + step feature.StepFn + }{ + "Exact filter": { + filters: []eventingv1.SubscriptionsAPIFilter{ + { + Exact: map[string]string{ + "type": matchedEvent.Type(), + "source": matchedEvent.Source(), + }, + }, + }, + }, + "Prefix filter": { + filters: []eventingv1.SubscriptionsAPIFilter{ + { + Prefix: map[string]string{ + "type": matchedEvent.Type()[:4], + "source": matchedEvent.Source()[:4], + }, + }, + }, + }, + "Suffix filter": { + filters: []eventingv1.SubscriptionsAPIFilter{ + { + Suffix: map[string]string{ + "type": matchedEvent.Type()[5:], + "source": matchedEvent.Source()[5:], + }, + }, + }, + }, + "CloudEvents SQL filter": { + filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: fmt.Sprintf("type = '%s' AND source = '%s'", matchedEvent.Type(), matchedEvent.Source()), + }, + }, + }, + } + + for name, fs := range tests { + matchedSender := feature.MakeRandomK8sName("sender") + unmatchedSender := feature.MakeRandomK8sName("sender") + subscriber := feature.MakeRandomK8sName("subscriber") + triggerName := feature.MakeRandomK8sName("viaTrigger") + + f := feature.NewFeatureNamed(name) + + f.Setup("Install trigger subscriber", eventshub.Install(subscriber, eventshub.StartReceiver)) + + // Set the Trigger subscriber. + cfg := []manifest.CfgFn{ + trigger.WithSubscriber(service.AsKReference(subscriber), ""), + trigger.WithNewFilters(fs.filters), + } + + f.Setup("Install trigger", trigger.Install(triggerName, brokerName, cfg...)) + f.Setup("Wait for trigger to become ready", trigger.IsReady(triggerName)) + f.Setup("Broker is addressable", k8s.IsAddressable(broker.GVR(), brokerName)) + + f.Requirement("Install matched event sender", eventshub.Install(matchedSender, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(matchedEvent)), + ) + + f.Requirement("Install unmatched event sender", eventshub.Install(unmatchedSender, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(unmatchedEvent)), + ) + + f.Alpha("Triggers with new filters"). + Must("must deliver matched events", OnStore(subscriber).MatchEvent(HasId(matchedEvent.ID())).AtLeast(1)). + MustNot("must not deliver unmatched events", OnStore(subscriber).MatchEvent(HasId(unmatchedEvent.ID())).Not()) + features = append(features, f) + } + + return &feature.FeatureSet{ + Name: "New trigger filters", + Features: features, + } +} + +type CloudEventsContext struct { + eventType string + eventSource string + eventSubject string + eventID string + eventDataSchema string + eventDataContentType string + shouldDeliver bool +} + +func AnyFilterFeature(brokerName string) *feature.Feature { + f := feature.NewFeature() + + eventContexts := []CloudEventsContext{ + { + eventType: "exact.event.type", + shouldDeliver: true, + }, + { + eventType: "prefix.event.type", + shouldDeliver: true, + }, + { + eventType: "event.type.suffix", + shouldDeliver: true, + }, + { + eventType: "not.type.event", + shouldDeliver: true, + }, + { + eventType: "cesql.event.type", + shouldDeliver: true, + }, + { + eventType: "not.event.type", + shouldDeliver: false, + }, + } + + filters := []eventingv1.SubscriptionsAPIFilter{ + { + Any: []eventingv1.SubscriptionsAPIFilter{ + { + Exact: map[string]string{ + "type": "exact.event.type", + }, + }, + { + Prefix: map[string]string{ + "type": "prefix", + }, + }, + { + Suffix: map[string]string{ + "type": "suffix", + }, + }, + { + Not: &eventingv1.SubscriptionsAPIFilter{ + CESQL: "type LIKE '%event.type%'", + }, + }, + { + CESQL: "type = 'cesql.event.type'", + }, + }, + }, + } + + f = newEventFilterFeature(eventContexts, filters, f, brokerName) + + return f +} diff --git a/test/experimental/features/new_trigger_filters/filters.go b/test/experimental/features/new_trigger_filters/filters.go index 76d108bab56..86430a31195 100644 --- a/test/experimental/features/new_trigger_filters/filters.go +++ b/test/experimental/features/new_trigger_filters/filters.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The Knative Authors +Copyright 2023 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -27,103 +27,69 @@ import ( "knative.dev/reconciler-test/pkg/manifest" "knative.dev/reconciler-test/pkg/resources/service" + "github.com/cloudevents/sdk-go/v2/event" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/trigger" ) -// FiltersFeatureSet creates a feature set for testing the broker implementation of the new trigger filters experimental feature -// (aka Cloud Events Subscriptions API filters). It requires a created and ready Broker resource with brokerName. -// -// The feature set tests four filter dialects: exact, prefix, suffix and cesql (aka CloudEvents SQL). -func FiltersFeatureSet(brokerName string) *feature.FeatureSet { - matchedEvent := FullEvent() - unmatchedEvent := MinEvent() - unmatchedEvent.SetType("org.wrong.type") - unmatchedEvent.SetSource("org.wrong.source") - - features := make([]*feature.Feature, 0, 8) - tests := map[string]struct { - filters []eventingv1.SubscriptionsAPIFilter - step feature.StepFn - }{ - "Exact filter": { - filters: []eventingv1.SubscriptionsAPIFilter{ - { - Exact: map[string]string{ - "type": matchedEvent.Type(), - "source": matchedEvent.Source(), - }, - }, - }, - }, - "Prefix filter": { - filters: []eventingv1.SubscriptionsAPIFilter{ - { - Prefix: map[string]string{ - "type": matchedEvent.Type()[:4], - "source": matchedEvent.Source()[:4], - }, - }, - }, - }, - "Suffix filter": { - filters: []eventingv1.SubscriptionsAPIFilter{ - { - Suffix: map[string]string{ - "type": matchedEvent.Type()[5:], - "source": matchedEvent.Source()[5:], - }, - }, - }, - }, - "CloudEvents SQL filter": { - filters: []eventingv1.SubscriptionsAPIFilter{ - { - CESQL: fmt.Sprintf("type = '%s' AND source = '%s'", matchedEvent.Type(), matchedEvent.Source()), - }, - }, - }, - } +func newEventFilterFeature(eventContexts []CloudEventsContext, filters []eventingv1.SubscriptionsAPIFilter, f *feature.Feature, brokerName string) *feature.Feature { + subscriberName := feature.MakeRandomK8sName("subscriber") + triggerName := feature.MakeRandomK8sName("trigger") - for name, fs := range tests { - matchedSender := feature.MakeRandomK8sName("sender") - unmatchedSender := feature.MakeRandomK8sName("sender") - subscriber := feature.MakeRandomK8sName("subscriber") - triggerName := feature.MakeRandomK8sName("viaTrigger") + f.Setup("Install trigger subscriber", eventshub.Install(subscriberName, eventshub.StartReceiver)) - f := feature.NewFeatureNamed(name) + cfg := []manifest.CfgFn{ + trigger.WithSubscriber(service.AsKReference(subscriberName), ""), + trigger.WithNewFilters(filters), + } - f.Setup("Install trigger subscriber", eventshub.Install(subscriber, eventshub.StartReceiver)) + f.Setup("Install trigger", trigger.Install(triggerName, brokerName, cfg...)) + f.Setup("Wait for trigger to become ready", trigger.IsReady(triggerName)) + f.Setup("Broker is addressable", k8s.IsAddressable(broker.GVR(), brokerName)) - // Set the Trigger subscriber. - cfg := []manifest.CfgFn{ - trigger.WithSubscriber(service.AsKReference(subscriber), ""), - trigger.WithNewFilters(fs.filters), - } + asserter := f.Alpha("New filters") - f.Setup("Install trigger", trigger.Install(triggerName, brokerName, cfg...)) - f.Setup("Wait for trigger to become ready", trigger.IsReady(triggerName)) - f.Setup("Broker is addressable", k8s.IsAddressable(broker.GVR(), brokerName)) + for _, eventCtx := range eventContexts { + event := newEventFromEventContext(eventCtx) + eventSender := feature.MakeRandomK8sName("sender") - f.Requirement("Install matched event sender", eventshub.Install(matchedSender, + f.Requirement(fmt.Sprintf("Install event sender %s", eventSender), eventshub.Install(eventSender, eventshub.StartSenderToResource(broker.GVR(), brokerName), - eventshub.InputEvent(matchedEvent)), - ) + eventshub.InputEvent(event), + )) - f.Requirement("Install unmatched event sender", eventshub.Install(unmatchedSender, - eventshub.StartSenderToResource(broker.GVR(), brokerName), - eventshub.InputEvent(unmatchedEvent)), - ) - - f.Alpha("Triggers with new filters"). - Must("must deliver matched events", OnStore(subscriber).MatchEvent(HasId(matchedEvent.ID())).AtLeast(1)). - MustNot("must not deliver unmatched events", OnStore(subscriber).MatchEvent(HasId(unmatchedEvent.ID())).Not()) - features = append(features, f) + if eventCtx.shouldDeliver { + asserter.Must("must deliver matched event", OnStore(subscriberName).MatchEvent(HasId(event.ID())).AtLeast(1)) + } else { + asserter.MustNot("must not deliver unmatched event", OnStore(subscriberName).MatchEvent(HasId(event.ID())).Not()) + } } - return &feature.FeatureSet{ - Name: "New trigger filters", - Features: features, + return f +} + +func newEventFromEventContext(eventCtx CloudEventsContext) event.Event { + event := MinEvent() + // Ensure that each event has a unique ID + event.SetID(feature.MakeRandomK8sName("event")) + if eventCtx.eventType != "" { + event.SetType(eventCtx.eventType) + } + if eventCtx.eventSource != "" { + event.SetSource(eventCtx.eventSource) + } + if eventCtx.eventSubject != "" { + event.SetSubject(eventCtx.eventSubject) + } + if eventCtx.eventID != "" { + event.SetID(eventCtx.eventID) + } + if eventCtx.eventDataSchema != "" { + event.SetDataSchema(eventCtx.eventDataSchema) + } + if eventCtx.eventDataContentType != "" { + event.SetDataContentType(eventCtx.eventDataContentType) } + return event } diff --git a/test/experimental/new_trigger_filters_test.go b/test/experimental/new_trigger_filters_test.go index ca11e0f7a92..af55a53089c 100644 --- a/test/experimental/new_trigger_filters_test.go +++ b/test/experimental/new_trigger_filters_test.go @@ -49,6 +49,22 @@ func TestMTChannelBrokerNewTriggerFilters(t *testing.T) { env.TestSet(ctx, t, newfilters.FiltersFeatureSet(brokerName)) } +func TestMTChannelBrokerAnyTriggerFilters(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + brokerName := "default" + + env.Prerequisite(ctx, t, InstallMTBroker(brokerName)) + env.Test(ctx, t, newfilters.AnyFilterFeature(brokerName)) +} + func InstallMTBroker(name string) *feature.Feature { f := feature.NewFeatureNamed("Multi-tenant channel-based broker") f.Setup(fmt.Sprintf("Install broker %q", name), broker.Install(name, broker.WithEnvConfig()...))