Skip to content

Commit 6ff7de7

Browse files
committed
fix merge conflicts
Signed-off-by: Calum Murray <[email protected]>
2 parents fb50b27 + bf945f9 commit 6ff7de7

File tree

4 files changed

+85
-5
lines changed

4 files changed

+85
-5
lines changed

pkg/channel/event_receiver.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
nethttp "net/http"
2424
"time"
2525

26+
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
27+
2628
"knative.dev/eventing/pkg/apis/feature"
2729

2830
"knative.dev/eventing/pkg/auth"
@@ -71,6 +73,7 @@ type EventReceiver struct {
7173
reporter StatsReporter
7274
tokenVerifier *auth.OIDCTokenVerifier
7375
audience string
76+
getPoliciesForFunc GetPoliciesForFunc
7477
withContext func(context.Context) context.Context
7578
}
7679

@@ -107,6 +110,16 @@ func ResolveChannelFromPath(PathToChannelFunc ResolveChannelFromPathFunc) EventR
107110
}
108111
}
109112

113+
// GetPoliciesForFunc function enables the EventReceiver to get the Channels AppliedEventPoliciesStatus
114+
type GetPoliciesForFunc func(channel ChannelReference) ([]duckv1.AppliedEventPolicyRef, error)
115+
116+
func ReceiverWithGetPoliciesForFunc(fn GetPoliciesForFunc) EventReceiverOptions {
117+
return func(r *EventReceiver) error {
118+
r.getPoliciesForFunc = fn
119+
return nil
120+
}
121+
}
122+
110123
func OIDCTokenVerification(tokenVerifier *auth.OIDCTokenVerifier, audience string) EventReceiverOptions {
111124
return func(r *EventReceiver) error {
112125
r.tokenVerifier = tokenVerifier
@@ -256,12 +269,26 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
256269
features := feature.FromContext(ctx)
257270
if features.IsOIDCAuthentication() {
258271
r.logger.Debug("OIDC authentication is enabled")
259-
err = r.tokenVerifier.VerifyJWTFromRequest(ctx, request, &r.audience, response)
272+
273+
if r.getPoliciesForFunc == nil {
274+
r.logger.Error("getPoliciesForFunc() callback not set. Can't get applying event policies of channel")
275+
response.WriteHeader(nethttp.StatusInternalServerError)
276+
return
277+
}
278+
279+
applyingEventPolicies, err := r.getPoliciesForFunc(channel)
280+
if err != nil {
281+
r.logger.Error("could not get applying event policies of channel", zap.Error(err), zap.String("channel", channel.String()))
282+
response.WriteHeader(nethttp.StatusInternalServerError)
283+
return
284+
}
285+
286+
err = r.tokenVerifier.VerifyRequest(ctx, features, &r.audience, channel.Namespace, applyingEventPolicies, request, response)
260287
if err != nil {
261-
r.logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
288+
r.logger.Warn("could not verify authn and authz of request", zap.Error(err))
262289
return
263290
}
264-
r.logger.Debug("Request contained a valid JWT. Continuing...")
291+
r.logger.Debug("Request contained a valid and authorized JWT. Continuing...")
265292
}
266293

267294
err = r.receiverFunc(request.Context(), channel, *event, utils.PassThroughHeaders(request.Header))

pkg/reconciler/inmemorychannel/dispatcher/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func NewController(
147147
eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider),
148148
tokenVerifier: auth.NewOIDCTokenVerifier(ctx, featureStore.Load()),
149149
clientConfig: clientConfig,
150+
inMemoryChannelLister: inmemorychannelInformer.Lister(),
150151
}
151152

152153
impl := inmemorychannelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {

pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"context"
2121
"fmt"
2222

23+
listers "knative.dev/eventing/pkg/client/listers/messaging/v1"
24+
2325
"github.com/google/go-cmp/cmp"
2426
"github.com/google/go-cmp/cmp/cmpopts"
2527
"go.uber.org/zap"
@@ -55,12 +57,13 @@ type Reconciler struct {
5557
reporter channel.StatsReporter
5658
messagingClientSet messagingv1.MessagingV1Interface
5759
eventTypeLister v1beta2.EventTypeLister
60+
inMemoryChannelLister listers.InMemoryChannelLister
5861
eventingClient eventingv1beta2.EventingV1beta2Interface
5962
featureStore *feature.Store
6063
eventDispatcher *kncloudevents.Dispatcher
61-
tokenVerifier *auth.OIDCTokenVerifier
6264

63-
clientConfig eventingtls.ClientConfig
65+
tokenVerifier *auth.OIDCTokenVerifier
66+
clientConfig eventingtls.ClientConfig
6467
}
6568

6669
// Check the interfaces Reconciler should implement
@@ -133,6 +136,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec
133136
r.eventDispatcher,
134137
channel.OIDCTokenVerification(r.tokenVerifier, audience(imc)),
135138
channel.ReceiverWithContextFunc(wc),
139+
channel.ReceiverWithGetPoliciesForFunc(r.getAppliedEventPolicyRef),
136140
)
137141
if err != nil {
138142
logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err)
@@ -165,6 +169,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec
165169
channel.ResolveChannelFromPath(channel.ParseChannelFromPath),
166170
channel.OIDCTokenVerification(r.tokenVerifier, audience(imc)),
167171
channel.ReceiverWithContextFunc(wc),
172+
channel.ReceiverWithGetPoliciesForFunc(r.getAppliedEventPolicyRef),
168173
)
169174
if err != nil {
170175
logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err)
@@ -278,6 +283,15 @@ func (r *Reconciler) deleteFunc(obj interface{}) {
278283
handleSubscribers(imc.Spec.Subscribers, kncloudevents.DeleteAddressableHandler)
279284
}
280285

286+
func (r *Reconciler) getAppliedEventPolicyRef(channel channel.ChannelReference) ([]eventingduckv1.AppliedEventPolicyRef, error) {
287+
imc, err := r.inMemoryChannelLister.InMemoryChannels(channel.Namespace).Get(channel.Name)
288+
if err != nil {
289+
return nil, fmt.Errorf("could not get inmemory channel %s/%s: %w", channel.Namespace, channel.Name, err)
290+
}
291+
292+
return imc.Status.Policies, nil
293+
}
294+
281295
func handleSubscribers(subscribers []eventingduckv1.SubscriberSpec, handle func(duckv1.Addressable)) {
282296
for _, sub := range subscribers {
283297
handle(duckv1.Addressable{

test/rekt/channel_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"testing"
2424
"time"
2525

26+
"knative.dev/eventing/test/rekt/features/authz"
2627
"knative.dev/reconciler-test/pkg/feature"
2728

2829
"github.com/cloudevents/sdk-go/v2/binding"
@@ -39,6 +40,7 @@ import (
3940
"knative.dev/eventing/test/rekt/features/channel"
4041
"knative.dev/eventing/test/rekt/features/oidc"
4142
ch "knative.dev/eventing/test/rekt/resources/channel"
43+
channelresource "knative.dev/eventing/test/rekt/resources/channel"
4244
"knative.dev/eventing/test/rekt/resources/channel_impl"
4345
"knative.dev/eventing/test/rekt/resources/subscription"
4446
)
@@ -392,3 +394,39 @@ func TestChannelImplSupportsOIDC(t *testing.T) {
392394

393395
env.TestSet(ctx, t, oidc.AddressableOIDCConformance(channel_impl.GVR(), channel_impl.GVK().Kind, name, env.Namespace()))
394396
}
397+
398+
func TestChannelImplSupportsAuthZ(t *testing.T) {
399+
t.Parallel()
400+
401+
ctx, env := global.Environment(
402+
knative.WithKnativeNamespace(system.Namespace()),
403+
knative.WithLoggingConfig,
404+
knative.WithTracingConfig,
405+
k8s.WithEventListener,
406+
environment.Managed(t),
407+
eventshub.WithTLS(t),
408+
)
409+
410+
name := feature.MakeRandomK8sName("channelimpl")
411+
env.Prerequisite(ctx, t, channel.ImplGoesReady(name))
412+
413+
env.TestSet(ctx, t, authz.AddressableAuthZConformance(channel_impl.GVR(), channel_impl.GVK().Kind, name))
414+
}
415+
416+
func TestChannelSupportsAuthZ(t *testing.T) {
417+
t.Parallel()
418+
419+
ctx, env := global.Environment(
420+
knative.WithKnativeNamespace(system.Namespace()),
421+
knative.WithLoggingConfig,
422+
knative.WithTracingConfig,
423+
k8s.WithEventListener,
424+
environment.Managed(t),
425+
eventshub.WithTLS(t),
426+
)
427+
428+
name := feature.MakeRandomK8sName("channel")
429+
env.Prerequisite(ctx, t, channel.GoesReady(name))
430+
431+
env.TestSet(ctx, t, authz.AddressableAuthZConformance(channelresource.GVR(), "Channel", name))
432+
}

0 commit comments

Comments
 (0)