|
1 | 1 | /*
|
2 |
| -Copyright 2022 The Knative Authors |
| 2 | +Copyright 2023 The Knative Authors |
3 | 3 |
|
4 | 4 | Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | you may not use this file except in compliance with the License.
|
@@ -27,103 +27,69 @@ import (
|
27 | 27 | "knative.dev/reconciler-test/pkg/manifest"
|
28 | 28 | "knative.dev/reconciler-test/pkg/resources/service"
|
29 | 29 |
|
| 30 | + "github.com/cloudevents/sdk-go/v2/event" |
30 | 31 | eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
|
31 | 32 | "knative.dev/eventing/test/rekt/resources/broker"
|
32 | 33 | "knative.dev/eventing/test/rekt/resources/trigger"
|
33 | 34 | )
|
34 | 35 |
|
35 |
| -// FiltersFeatureSet creates a feature set for testing the broker implementation of the new trigger filters experimental feature |
36 |
| -// (aka Cloud Events Subscriptions API filters). It requires a created and ready Broker resource with brokerName. |
37 |
| -// |
38 |
| -// The feature set tests four filter dialects: exact, prefix, suffix and cesql (aka CloudEvents SQL). |
39 |
| -func FiltersFeatureSet(brokerName string) *feature.FeatureSet { |
40 |
| - matchedEvent := FullEvent() |
41 |
| - unmatchedEvent := MinEvent() |
42 |
| - unmatchedEvent.SetType("org.wrong.type") |
43 |
| - unmatchedEvent.SetSource("org.wrong.source") |
44 |
| - |
45 |
| - features := make([]*feature.Feature, 0, 8) |
46 |
| - tests := map[string]struct { |
47 |
| - filters []eventingv1.SubscriptionsAPIFilter |
48 |
| - step feature.StepFn |
49 |
| - }{ |
50 |
| - "Exact filter": { |
51 |
| - filters: []eventingv1.SubscriptionsAPIFilter{ |
52 |
| - { |
53 |
| - Exact: map[string]string{ |
54 |
| - "type": matchedEvent.Type(), |
55 |
| - "source": matchedEvent.Source(), |
56 |
| - }, |
57 |
| - }, |
58 |
| - }, |
59 |
| - }, |
60 |
| - "Prefix filter": { |
61 |
| - filters: []eventingv1.SubscriptionsAPIFilter{ |
62 |
| - { |
63 |
| - Prefix: map[string]string{ |
64 |
| - "type": matchedEvent.Type()[:4], |
65 |
| - "source": matchedEvent.Source()[:4], |
66 |
| - }, |
67 |
| - }, |
68 |
| - }, |
69 |
| - }, |
70 |
| - "Suffix filter": { |
71 |
| - filters: []eventingv1.SubscriptionsAPIFilter{ |
72 |
| - { |
73 |
| - Suffix: map[string]string{ |
74 |
| - "type": matchedEvent.Type()[5:], |
75 |
| - "source": matchedEvent.Source()[5:], |
76 |
| - }, |
77 |
| - }, |
78 |
| - }, |
79 |
| - }, |
80 |
| - "CloudEvents SQL filter": { |
81 |
| - filters: []eventingv1.SubscriptionsAPIFilter{ |
82 |
| - { |
83 |
| - CESQL: fmt.Sprintf("type = '%s' AND source = '%s'", matchedEvent.Type(), matchedEvent.Source()), |
84 |
| - }, |
85 |
| - }, |
86 |
| - }, |
87 |
| - } |
| 36 | +func newEventFilterFeature(eventContexts []CloudEventsContext, filters []eventingv1.SubscriptionsAPIFilter, f *feature.Feature, brokerName string) *feature.Feature { |
| 37 | + subscriberName := feature.MakeRandomK8sName("subscriber") |
| 38 | + triggerName := feature.MakeRandomK8sName("trigger") |
88 | 39 |
|
89 |
| - for name, fs := range tests { |
90 |
| - matchedSender := feature.MakeRandomK8sName("sender") |
91 |
| - unmatchedSender := feature.MakeRandomK8sName("sender") |
92 |
| - subscriber := feature.MakeRandomK8sName("subscriber") |
93 |
| - triggerName := feature.MakeRandomK8sName("viaTrigger") |
| 40 | + f.Setup("Install trigger subscriber", eventshub.Install(subscriberName, eventshub.StartReceiver)) |
94 | 41 |
|
95 |
| - f := feature.NewFeatureNamed(name) |
| 42 | + cfg := []manifest.CfgFn{ |
| 43 | + trigger.WithSubscriber(service.AsKReference(subscriberName), ""), |
| 44 | + trigger.WithNewFilters(filters), |
| 45 | + } |
96 | 46 |
|
97 |
| - f.Setup("Install trigger subscriber", eventshub.Install(subscriber, eventshub.StartReceiver)) |
| 47 | + f.Setup("Install trigger", trigger.Install(triggerName, brokerName, cfg...)) |
| 48 | + f.Setup("Wait for trigger to become ready", trigger.IsReady(triggerName)) |
| 49 | + f.Setup("Broker is addressable", k8s.IsAddressable(broker.GVR(), brokerName)) |
98 | 50 |
|
99 |
| - // Set the Trigger subscriber. |
100 |
| - cfg := []manifest.CfgFn{ |
101 |
| - trigger.WithSubscriber(service.AsKReference(subscriber), ""), |
102 |
| - trigger.WithNewFilters(fs.filters), |
103 |
| - } |
| 51 | + asserter := f.Alpha("New filters") |
104 | 52 |
|
105 |
| - f.Setup("Install trigger", trigger.Install(triggerName, brokerName, cfg...)) |
106 |
| - f.Setup("Wait for trigger to become ready", trigger.IsReady(triggerName)) |
107 |
| - f.Setup("Broker is addressable", k8s.IsAddressable(broker.GVR(), brokerName)) |
| 53 | + for _, eventCtx := range eventContexts { |
| 54 | + event := newEventFromEventContext(eventCtx) |
| 55 | + eventSender := feature.MakeRandomK8sName("sender") |
108 | 56 |
|
109 |
| - f.Requirement("Install matched event sender", eventshub.Install(matchedSender, |
| 57 | + f.Requirement(fmt.Sprintf("Install event sender %s", eventSender), eventshub.Install(eventSender, |
110 | 58 | eventshub.StartSenderToResource(broker.GVR(), brokerName),
|
111 |
| - eventshub.InputEvent(matchedEvent)), |
112 |
| - ) |
| 59 | + eventshub.InputEvent(event), |
| 60 | + )) |
113 | 61 |
|
114 |
| - f.Requirement("Install unmatched event sender", eventshub.Install(unmatchedSender, |
115 |
| - eventshub.StartSenderToResource(broker.GVR(), brokerName), |
116 |
| - eventshub.InputEvent(unmatchedEvent)), |
117 |
| - ) |
118 |
| - |
119 |
| - f.Alpha("Triggers with new filters"). |
120 |
| - Must("must deliver matched events", OnStore(subscriber).MatchEvent(HasId(matchedEvent.ID())).AtLeast(1)). |
121 |
| - MustNot("must not deliver unmatched events", OnStore(subscriber).MatchEvent(HasId(unmatchedEvent.ID())).Not()) |
122 |
| - features = append(features, f) |
| 62 | + if eventCtx.shouldDeliver { |
| 63 | + asserter.Must("must deliver matched event", OnStore(subscriberName).MatchEvent(HasId(event.ID())).AtLeast(1)) |
| 64 | + } else { |
| 65 | + asserter.MustNot("must not deliver unmatched event", OnStore(subscriberName).MatchEvent(HasId(event.ID())).Not()) |
| 66 | + } |
123 | 67 | }
|
124 | 68 |
|
125 |
| - return &feature.FeatureSet{ |
126 |
| - Name: "New trigger filters", |
127 |
| - Features: features, |
| 69 | + return f |
| 70 | +} |
| 71 | + |
| 72 | +func newEventFromEventContext(eventCtx CloudEventsContext) event.Event { |
| 73 | + event := MinEvent() |
| 74 | + // Ensure that each event has a unique ID |
| 75 | + event.SetID(feature.MakeRandomK8sName("event")) |
| 76 | + if eventCtx.eventType != "" { |
| 77 | + event.SetType(eventCtx.eventType) |
| 78 | + } |
| 79 | + if eventCtx.eventSource != "" { |
| 80 | + event.SetSource(eventCtx.eventSource) |
| 81 | + } |
| 82 | + if eventCtx.eventSubject != "" { |
| 83 | + event.SetSubject(eventCtx.eventSubject) |
| 84 | + } |
| 85 | + if eventCtx.eventID != "" { |
| 86 | + event.SetID(eventCtx.eventID) |
| 87 | + } |
| 88 | + if eventCtx.eventDataSchema != "" { |
| 89 | + event.SetDataSchema(eventCtx.eventDataSchema) |
| 90 | + } |
| 91 | + if eventCtx.eventDataContentType != "" { |
| 92 | + event.SetDataContentType(eventCtx.eventDataContentType) |
128 | 93 | }
|
| 94 | + return event |
129 | 95 | }
|
0 commit comments