From 4bd5aef19941b9c462786aeced693cbc2b0b1d3b Mon Sep 17 00:00:00 2001 From: Cali0707 Date: Sun, 22 Sep 2024 20:21:57 -0400 Subject: [PATCH] refactor: pass configmap watcher to auth verifier constructor instead of feature flags Signed-off-by: Cali0707 --- cmd/broker/filter/main.go | 3 +-- cmd/broker/ingress/main.go | 3 +-- cmd/jobsink/main.go | 9 ++----- pkg/auth/verifier.go | 19 +++++++++++--- pkg/broker/filter/filter_handler.go | 6 ++--- pkg/broker/filter/filter_handler_test.go | 26 +++++++++++++++++-- pkg/broker/ingress/ingress_handler.go | 6 ++--- pkg/broker/ingress/ingress_handler_test.go | 14 +++++++++- .../inmemorychannel/dispatcher/controller.go | 3 +-- 9 files changed, 64 insertions(+), 25 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 075ec8ba29d..e491670f934 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -136,7 +136,6 @@ func main() { } handler.EventTypeCreator = autoCreate } - handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureFlags) }) featureStore.WatchConfigs(configMapWatcher) @@ -156,7 +155,7 @@ func main() { oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher) handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapLister, ctxFunc) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index db5ddb754fd..bd8e76d8fea 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -159,7 +159,6 @@ func main() { } handler.EvenTypeHandler = autoCreate } - handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureFlags) }) featureStore.WatchConfigs(configMapWatcher) @@ -171,7 +170,7 @@ func main() { reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher) handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapLister, ctxFunc) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index 1c953b50a5e..a79cf5d7655 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -114,12 +114,7 @@ func main() { trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) var h *Handler - featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { - logger.Info("Updated", zap.String("name", name), zap.Any("value", value)) - if flags, ok := value.(feature.Flags); ok && h != nil { - h.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, flags) - } - }) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) featureStore.WatchConfigs(configMapWatcher) // Decorate contexts with the current state of the feature config. @@ -131,7 +126,7 @@ func main() { k8s: kubeclient.Get(ctx), lister: jobsink.Get(ctx).Lister(), withContext: ctxFunc, - authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()), + authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher), } tlsConfig, err := getServerTLSConfig(ctx) diff --git a/pkg/auth/verifier.go b/pkg/auth/verifier.go index 85027eac04b..c76c9df8b8d 100644 --- a/pkg/auth/verifier.go +++ b/pkg/auth/verifier.go @@ -30,6 +30,7 @@ import ( "go.opencensus.io/plugin/ochttp" corev1listers "k8s.io/client-go/listers/core/v1" "knative.dev/eventing/pkg/eventingtls" + "knative.dev/pkg/configmap" "knative.dev/pkg/network" "knative.dev/pkg/tracing/propagation/tracecontextb3" @@ -65,7 +66,7 @@ type IDToken struct { AccessTokenHash string } -func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, features feature.Flags) *Verifier { +func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, cmw configmap.Watcher) *Verifier { tokenHandler := &Verifier{ logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), restConfig: injection.GetConfig(ctx), @@ -73,7 +74,16 @@ func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1. trustBundleConfigMapLister: trustBundleConfigMapLister, } - if err := tokenHandler.initOIDCProvider(ctx, features); err != nil { + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if features, ok := value.(feature.Flags); ok { + if err := tokenHandler.initOIDCProvider(ctx, features); err != nil { + tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider after config update. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err)) + } + } + }) + featureStore.WatchConfigs(cmw) + + if err := tokenHandler.initOIDCProvider(ctx, featureStore.Load()); err != nil { tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err)) } @@ -243,11 +253,14 @@ func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags) ctx = oidc.ClientContext(ctx, httpClient) // get OIDC provider - v.provider, err = oidc.NewProvider(ctx, features.OIDCDiscoveryBaseURL()) + provider, err := oidc.NewProvider(ctx, features.OIDCDiscoveryBaseURL()) if err != nil { return fmt.Errorf("could not get OIDC provider: %w", err) } + // provider is valid, update it + v.provider = provider + v.logger.Debug("updated OIDC provider config", zap.Any("discovery-config", discovery)) return nil diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 78fb902c5ef..08ddf1c1a4f 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -90,7 +90,7 @@ type Handler struct { logger *zap.Logger withContext func(ctx context.Context) context.Context filtersMap *subscriptionsapi.FiltersMap - TokenVerifier *auth.Verifier + tokenVerifier *auth.Verifier EventTypeCreator *eventtype.EventTypeAutoHandler } @@ -153,7 +153,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.Verifier, oidcTokenProvi brokerLister: brokerInformer.Lister(), subscriptionLister: subscriptionInformer.Lister(), logger: logger, - TokenVerifier: tokenVerifier, + tokenVerifier: tokenVerifier, withContext: wc, filtersMap: fm, }, nil @@ -225,7 +225,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } subscriptionFullIdentity := fmt.Sprintf("system:serviceaccount:%s:%s", subscription.Namespace, *subscription.Status.Auth.ServiceAccountName) - err = h.TokenVerifier.VerifyRequestFromSubject(ctx, features, &audience, subscriptionFullIdentity, request, writer) + err = h.tokenVerifier.VerifyRequestFromSubject(ctx, features, &audience, subscriptionFullIdentity, request, writer) if err != nil { h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 4d1852002ac..b15f2fde3df 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -29,6 +29,7 @@ import ( "knative.dev/eventing/pkg/eventingtls" filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + "knative.dev/pkg/configmap" "knative.dev/pkg/system" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" @@ -42,6 +43,7 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zaptest" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/apis" @@ -451,7 +453,17 @@ func TestReceiver(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContext(ctx)) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-features", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + feature.OIDCAuthentication: string(feature.Enabled), + }, + }, + )) for _, trig := range tc.triggers { // Replace the SubscriberURI to point at our fake server. @@ -661,7 +673,17 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContext(ctx)) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-features", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + feature.OIDCAuthentication: string(feature.Enabled), + }, + }, + )) // Replace the SubscriberURI to point at our fake server. for _, trig := range tc.triggers { diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index c183832b8ba..bdb817e6796 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -73,7 +73,7 @@ type Handler struct { eventDispatcher *kncloudevents.Dispatcher - TokenVerifier *auth.Verifier + tokenVerifier *auth.Verifier withContext func(ctx context.Context) context.Context } @@ -128,7 +128,7 @@ func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.Eve Logger: logger, BrokerLister: brokerInformer.Lister(), eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider), - TokenVerifier: tokenVerifier, + tokenVerifier: tokenVerifier, withContext: withContext, }, nil } @@ -237,7 +237,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { if broker.Status.Address != nil { audience = broker.Status.Address.Audience } - err = h.TokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer) + err = h.tokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer) if err != nil { h.Logger.Warn("Failed to verify AuthN and AuthZ.", zap.Error(err)) return diff --git a/pkg/broker/ingress/ingress_handler_test.go b/pkg/broker/ingress/ingress_handler_test.go index 991566e2182..f30174c1476 100644 --- a/pkg/broker/ingress/ingress_handler_test.go +++ b/pkg/broker/ingress/ingress_handler_test.go @@ -36,10 +36,12 @@ import ( cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/google/go-cmp/cmp" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/configmap" reconcilertesting "knative.dev/pkg/reconciler/testing" @@ -299,7 +301,17 @@ func TestHandler_ServeHTTP(t *testing.T) { } tokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContextOrDefaults(ctx)) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-features", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + feature.OIDCAuthentication: string(feature.Enabled), + }, + }, + )) h, err := NewHandler(logger, &mockReporter{}, diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index 8ee3af7da34..2eac2c0771c 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -146,7 +146,7 @@ func NewController( eventingClient: eventingclient.Get(ctx).EventingV1beta2(), eventTypeLister: eventtypeinformer.Get(ctx).Lister(), eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider), - authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()), + authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, cmw), clientConfig: clientConfig, inMemoryChannelLister: inmemorychannelInformer.Lister(), } @@ -156,7 +156,6 @@ func NewController( }) globalResync = func(_ interface{}) { - r.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()) impl.GlobalResync(inmemorychannelInformer.Informer()) }