Skip to content

Commit

Permalink
Merge branch 'main' of github.com:knative/eventing into deprecate-kntest
Browse files Browse the repository at this point in the history
  • Loading branch information
upodroid committed Aug 24, 2023
2 parents 215b64c + 08cf00f commit d783508
Show file tree
Hide file tree
Showing 22 changed files with 619 additions and 253 deletions.
18 changes: 16 additions & 2 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,26 @@ func main() {
logger.Fatal("Error setting up trace publishing", zap.Error(err))
}

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
var featureStore *feature.Store
var handler *ingress.Handler

featureStore = feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
featureFlags := value.(feature.Flags)
if featureFlags.IsEnabled(feature.EvenTypeAutoCreate) && featureStore != nil && handler != nil {
autoCreate := &eventtype.EventTypeAutoHandler{
EventTypeLister: eventtypeinformer.Get(ctx).Lister(),
EventingClient: eventingclient.Get(ctx).EventingV1beta2(),
FeatureStore: featureStore,
Logger: logger,
}
handler.EvenTypeHandler = autoCreate
}
})
featureStore.WatchConfigs(configMapWatcher)

reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

handler, err := ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer)
handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ require (
k8s.io/apiserver v0.26.5
k8s.io/client-go v0.26.5
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2
knative.dev/hack v0.0.0-20230818155117-9cc05a31e8c0
knative.dev/hack/schema v0.0.0-20230814132844-3403e3502fdc
knative.dev/pkg v0.0.0-20230814093643-26b41ba523a0
knative.dev/reconciler-test v0.0.0-20230810072538-a7237b013cbb
knative.dev/hack v0.0.0-20230815012940-044c02b7a447
knative.dev/hack/schema v0.0.0-20230815012940-044c02b7a447
knative.dev/pkg v0.0.0-20230815132840-4f651e092853
knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674
sigs.k8s.io/yaml v1.3.0
)

Expand Down Expand Up @@ -120,7 +120,7 @@ require (
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.12.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.3.0 // indirect
google.golang.org/api v0.136.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect
Expand Down
23 changes: 10 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
Expand Down Expand Up @@ -709,8 +708,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY=
gomodules.xyz/jsonpatch/v2 v2.3.0 h1:8NFhfS6gzxNqjLIYnZxg319wZ5Qjnx4m/CcX+Klzazc=
gomodules.xyz/jsonpatch/v2 v2.3.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca h1:PupagGYwj8+I4ubCxcmcBRk3VlUWtTg5huQpZR9flmE=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
Expand Down Expand Up @@ -871,16 +870,14 @@ k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+O
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4=
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 h1:GfD9OzL11kvZN5iArC6oTS7RTj7oJOIfnislxYlqTj8=
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/hack v0.0.0-20230814132844-3403e3502fdc h1:sIJ2MfOFVb3+EIK+n5B7l5GeZxTrHegLzdBTYjreCCU=
knative.dev/hack v0.0.0-20230814132844-3403e3502fdc/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/hack v0.0.0-20230818155117-9cc05a31e8c0 h1:n9YEGYuoj31pAkhGlNL+xTQAeXKYTLeMmIZLWA9fAeo=
knative.dev/hack v0.0.0-20230818155117-9cc05a31e8c0/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/hack/schema v0.0.0-20230814132844-3403e3502fdc h1:ZVpJcVifq3TV/0yXaRkCBAMwCxQSVnhg/9N8cV2dseU=
knative.dev/hack/schema v0.0.0-20230814132844-3403e3502fdc/go.mod h1:GeIb+PLd5mllawcpHEGF5J5fYTQrvgEO5liao8lUKUs=
knative.dev/pkg v0.0.0-20230814093643-26b41ba523a0 h1:fqLbNL2/g016cnF3IQRjFr2aLmhcAnHiA1CANoBesOA=
knative.dev/pkg v0.0.0-20230814093643-26b41ba523a0/go.mod h1:D9D8LkOtlJBC09i+6nbTdZ5LAKsAbTJkkGGXEh2BPYI=
knative.dev/reconciler-test v0.0.0-20230810072538-a7237b013cbb h1:XKIYY3OOzlzuN1hX3jc3UoJyIXe33bghVHyyhIpkr2A=
knative.dev/reconciler-test v0.0.0-20230810072538-a7237b013cbb/go.mod h1:i+/PWK/n3HPgjXMoj5U7CA6WRW/C3c3EfHCQ0FmrhNM=
knative.dev/hack v0.0.0-20230815012940-044c02b7a447 h1:Lr4O/WEyZHuUBFbqATYdQlfLXvhPUCluF4zlgRi59T4=
knative.dev/hack v0.0.0-20230815012940-044c02b7a447/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/hack/schema v0.0.0-20230815012940-044c02b7a447 h1:/Q72vaUeJsQ3cVNddEYA8aqb++E2woTDEd8DrmcUsnM=
knative.dev/hack/schema v0.0.0-20230815012940-044c02b7a447/go.mod h1:GeIb+PLd5mllawcpHEGF5J5fYTQrvgEO5liao8lUKUs=
knative.dev/pkg v0.0.0-20230815132840-4f651e092853 h1:OyAYpXLa/jQWClFxRegCccGySyX2/1BVRtKaAWRE/xM=
knative.dev/pkg v0.0.0-20230815132840-4f651e092853/go.mod h1:Y5Tis5nMoapB9iTZywW20Qnv/7LBahrtHz9Sm6+l3LE=
knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674 h1:xgUvk/bKVq0wDgahE/wxmg3sD6j2mjCAimJGtxaQeiY=
knative.dev/reconciler-test v0.0.0-20230817080342-39774f133674/go.mod h1:i+/PWK/n3HPgjXMoj5U7CA6WRW/C3c3EfHCQ0FmrhNM=
pgregory.net/rapid v1.0.0 h1:iQaM2w5PZ6xvt6x7hbd7tiDS+nk7YPp5uCaEba+T/F4=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
Expand Down
9 changes: 8 additions & 1 deletion pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,20 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}

func toKReference(broker *eventingv1.Broker) *duckv1.KReference {
return &duckv1.KReference{
kref := &duckv1.KReference{
Kind: broker.Kind,
Namespace: broker.Namespace,
Name: broker.Name,
APIVersion: broker.APIVersion,
Address: broker.Status.Address.Name,
}
if kref.Kind == "" {
kref.Kind = "Broker"
}
if kref.APIVersion == "" {
kref.APIVersion = "eventing.knative.dev/v1"
}
return kref
}

func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloudevents.Event, brokerNamespace, brokerName string) (int, time.Duration) {
Expand Down
63 changes: 63 additions & 0 deletions pkg/eventfilter/benchmarks/any_filter_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2023 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package benchmarks

import (
"testing"

cetest "github.com/cloudevents/sdk-go/v2/test"
"knative.dev/eventing/pkg/eventfilter"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)

func BenchmarkAnyFilter(b *testing.B) {
// Full event with all possible fields filled
event := cetest.FullEvent()

filter, _ := subscriptionsapi.NewExactFilter(map[string]string{"id": event.ID()})
prefixFilter, _ := subscriptionsapi.NewPrefixFilter(map[string]string{"type": event.Type()[0:5]})
suffixFilter, _ := subscriptionsapi.NewSuffixFilter(map[string]string{"source": event.Source()[len(event.Source())-5:]})
prefixFilterNoMatch, _ := subscriptionsapi.NewPrefixFilter(map[string]string{"type": "qwertyuiop"})
suffixFilterNoMatch, _ := subscriptionsapi.NewSuffixFilter(map[string]string{"source": "qwertyuiop"})

RunFilterBenchmarks(b,
func(i interface{}) eventfilter.Filter {
filters := i.([]eventfilter.Filter)
return subscriptionsapi.NewAnyFilter(filters...)
},
FilterBenchmark{
name: "Any filter with exact filter test",
arg: []eventfilter.Filter{filter},
event: event,
},
FilterBenchmark{
name: "Any filter match all subfilters",
arg: []eventfilter.Filter{filter, prefixFilter, suffixFilter},
event: event,
},
FilterBenchmark{
name: "Any filter no 1 match at end of array",
arg: []eventfilter.Filter{prefixFilterNoMatch, suffixFilterNoMatch, filter},
event: event,
},
FilterBenchmark{
name: "Any filter no 1 match at start of array",
arg: []eventfilter.Filter{filter, prefixFilterNoMatch, suffixFilterNoMatch},
event: event,
},
)
}
15 changes: 10 additions & 5 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,22 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec
return err
}
var eventTypeAutoHandler *eventtype.EventTypeAutoHandler
var channelReference *duckv1.KReference
var channelRef *duckv1.KReference
var UID *types.UID
if r.featureStore.IsEnabled(feature.EvenTypeAutoCreate) {
if ownerReferences := imc.GetOwnerReferences(); r.featureStore.IsEnabled(feature.EvenTypeAutoCreate) &&
(len(ownerReferences) == 0 ||
ownerReferences[0].Kind != "Broker") {
logging.FromContext(ctx).Info("EventType autocreate is enabled, creating handler")
eventTypeAutoHandler = &eventtype.EventTypeAutoHandler{
EventTypeLister: r.eventTypeLister,
EventingClient: r.eventingClient,
FeatureStore: r.featureStore,
Logger: logging.FromContext(ctx).Desugar(),
}
channelReference = toKReference(imc)

channelRef = toKReference(imc)
UID = &imc.UID

}

// First grab the host based MultiChannelFanoutMessage httpHandler
Expand All @@ -115,7 +120,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec
config.FanoutConfig,
r.reporter,
eventTypeAutoHandler,
channelReference,
channelRef,
UID,
)
if err != nil {
Expand Down Expand Up @@ -143,7 +148,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec
config.FanoutConfig,
r.reporter,
eventTypeAutoHandler,
channelReference,
channelRef,
UID,
channel.ResolveChannelFromPath(channel.ParseChannelFromPath),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ import (
)

const (
testNS = "test-namespace"
imcName = "test-imc"
twoSubscriberPatch = `[{"op":"add","path":"/status/subscribers","value":[{"observedGeneration":1,"ready":"True","uid":"2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1"},{"observedGeneration":2,"ready":"True","uid":"34c5aec8-deb6-11e8-9f32-f2801f1b9fd1"}]}]`
oneSubscriberPatch = `[{"op":"add","path":"/status/subscribers","value":[{"observedGeneration":1,"ready":"True","uid":"2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1"}]}]`
oneSubscriberRemovedOneAddedPatch = `[{"op":"add","path":"/status/subscribers/2","value":{"observedGeneration":2,"ready":"True","uid":"34c5aec8-deb6-11e8-9f32-f2801f1b9fd1"}},{"op":"remove","path":"/status/subscribers/0"}]`
testNS = "test-namespace"
imcName = "test-imc"
twoSubscriberPatch = `[{"op":"add","path":"/status/subscribers","value":[{"observedGeneration":1,"ready":"True","uid":"2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1"},{"observedGeneration":2,"ready":"True","uid":"34c5aec8-deb6-11e8-9f32-f2801f1b9fd1"}]}]`
oneSubscriberPatch = `[{"op":"add","path":"/status/subscribers","value":[{"observedGeneration":1,"ready":"True","uid":"2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1"}]}]`
oneSubscriberReplaced = `[{"op":"replace","path":"/status/subscribers/1/uid","value":"34c5aec8-deb6-11e8-9f32-f2801f1b9fd1"}]`
)

var (
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestAllCases(t *testing.T) {
WithInMemoryChannelAddress(channelServiceAddress)),
},
}, {
Name: "with subscribers, one removed one added to status",
Name: "with subscribers, one replaced to status",
Key: imcKey,
Objects: []runtime.Object{
NewInMemoryChannel(imcName, testNS,
Expand All @@ -201,12 +201,12 @@ func TestAllCases(t *testing.T) {
WithInMemoryChannelEndpointsReady(),
WithInMemoryChannelChannelServiceReady(),
WithInMemoryChannelSubscribers(subscribers),
WithInMemoryChannelReadySubscriberAndGeneration(string(subscriber3UID), subscriber3Generation),
WithInMemoryChannelReadySubscriberAndGeneration(string(subscriber1UID), subscriber1Generation),
WithInMemoryChannelReadySubscriberAndGeneration(string(subscriber3UID), subscriber3Generation),
WithInMemoryChannelAddress(channelServiceAddress)),
},
WantPatches: []clientgotesting.PatchActionImpl{
makePatch(testNS, imcName, oneSubscriberRemovedOneAddedPatch),
makePatch(testNS, imcName, oneSubscriberReplaced),
},
}, {
Name: "subscriber with delivery spec",
Expand Down
1 change: 1 addition & 0 deletions test/experimental/config/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ data:
delivery-retryafter: "enabled"
delivery-timeout: "enabled"
new-trigger-filters: "enabled"
eventtype-auto-create: "enabled"
42 changes: 42 additions & 0 deletions test/experimental/eventtype_autocreate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//go:build e2e
// +build e2e

/*
Copyright 2023 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package experimental

import (
"testing"

"knative.dev/eventing/test/experimental/features/eventtype_autocreate"

"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
)

func TestIMCEventTypeAutoCreate(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

env.Test(ctx, t, eventtype_autocreate.AutoCreateEventTypesOnIMC())
}
61 changes: 61 additions & 0 deletions test/experimental/features/eventtype_autocreate/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2023 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package eventtype_autocreate

import (
cetest "github.com/cloudevents/sdk-go/v2/test"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/eventing/test/rekt/resources/eventtype"
"knative.dev/eventing/test/rekt/resources/subscription"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/resources/service"
)

func AutoCreateEventTypesOnIMC() *feature.Feature {
f := feature.NewFeature()

event := cetest.FullEvent()
event.SetType("test.custom.event.type")

sender := feature.MakeRandomK8sName("sender")
sub := feature.MakeRandomK8sName("subscription")
channelName := feature.MakeRandomK8sName("channel")
sink := feature.MakeRandomK8sName("sink")

f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))
f.Setup("install channel", channel_impl.Install(channelName))
f.Setup("install subscription", subscription.Install(sub,
subscription.WithChannel(channel_impl.AsRef(channelName)),
subscription.WithSubscriber(service.AsKReference(sink), ""),
))

f.Setup("subscription is ready", subscription.IsReady(sub))
f.Setup("channel is ready", channel_impl.IsReady(channelName))

f.Requirement("install event sender", eventshub.Install(sender,
eventshub.StartSenderToResource(channel_impl.GVR(), channelName),
eventshub.InputEvent(event),
))

expectedTypes := sets.New(event.Type())

f.Alpha("imc").
Must("deliver events to subscriber", assert.OnStore(sink).MatchEvent(cetest.HasId(event.ID())).AtLeast(1)).
Must("create event type", eventtype.WaitForEventType(eventtype.AssertPresent(expectedTypes)))

return f
}
Loading

0 comments on commit d783508

Please sign in to comment.