From 837d926f569a8de9c339e688e0f60860c5888adc Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 9 Aug 2024 12:47:10 -0400 Subject: [PATCH 1/5] feat: make oidc discovery url configurable Signed-off-by: Calum Murray --- cmd/broker/filter/main.go | 4 ++-- cmd/broker/ingress/main.go | 3 ++- cmd/jobsink/main.go | 9 ++++++-- pkg/apis/feature/features.go | 19 ++++++++++++++++- pkg/apis/feature/features_test.go | 2 ++ pkg/apis/feature/flag_names.go | 1 + .../feature/testdata/config-features.yaml | 1 + pkg/auth/verifier.go | 20 +++++++----------- pkg/broker/filter/filter_handler.go | 6 +++--- pkg/broker/filter/filter_handler_test.go | 4 ++-- pkg/broker/ingress/ingress_handler.go | 6 +++--- pkg/broker/ingress/ingress_handler_test.go | 3 ++- .../inmemorychannel/dispatcher/controller.go | 21 ++++++++++--------- 13 files changed, 62 insertions(+), 37 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 0ccb24ccb16..42b86389579 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -134,7 +134,7 @@ func main() { } handler.EventTypeCreator = autoCreate } - + handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureFlags) }) featureStore.WatchConfigs(configMapWatcher) @@ -154,7 +154,7 @@ func main() { oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister()) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()) trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc) if err != nil { diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 1b1b5c852b9..7192f2df967 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -157,6 +157,7 @@ func main() { } handler.EvenTypeHandler = autoCreate } + handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureFlags) }) featureStore.WatchConfigs(configMapWatcher) @@ -168,7 +169,7 @@ func main() { reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister()) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()) trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapInformer, ctxFunc) if err != nil { diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index d06dafdfd84..203f257c7b2 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -104,8 +104,13 @@ func main() { logger.Info("Starting the JobSink Ingress") + var h *Handler + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { logger.Info("Updated", zap.String("name", name), zap.Any("value", value)) + if flags, ok := value.(feature.Flags); ok && h != nil { + h.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), flags) + } }) featureStore.WatchConfigs(configMapWatcher) @@ -114,11 +119,11 @@ func main() { return logging.WithLogger(featureStore.ToContext(ctx), sl) } - h := &Handler{ + h = &Handler{ k8s: kubeclient.Get(ctx), lister: jobsink.Get(ctx).Lister(), withContext: ctxFunc, - authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister()), + authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()), } tlsConfig, err := getServerTLSConfig(ctx) diff --git a/pkg/apis/feature/features.go b/pkg/apis/feature/features.go index b2ca380fc77..98eeb2907ca 100644 --- a/pkg/apis/feature/features.go +++ b/pkg/apis/feature/features.go @@ -64,6 +64,9 @@ const ( // This configuration is applied when there is no EventPolicy with a "to" referencing a given // resource. AuthorizationAllowSameNamespace Flag = "Allow-Same-Namespace" + + // DefaultOIDCDiscoveryURL is the default OIDC Discovery URL used in most Kubernetes clusters. + DefaultOIDCDiscoveryBaseURL Flag = "https://kubernetes.default.svc" ) // Flags is a map containing all the enabled/disabled flags for the experimental features. @@ -81,6 +84,7 @@ func newDefaults() Flags { EvenTypeAutoCreate: Disabled, NewAPIServerFilters: Disabled, AuthorizationDefaultMode: AuthorizationAllowSameNamespace, + OIDCDiscoveryBaseURL: DefaultOIDCDiscoveryBaseURL, } } @@ -134,6 +138,19 @@ func (e Flags) IsAuthorizationDefaultModeSameNamespace() bool { return e != nil && e[AuthorizationDefaultMode] == AuthorizationAllowSameNamespace } +func (e Flags) OIDCDiscoveryBaseURL() string { + if e == nil { + return string(DefaultOIDCDiscoveryBaseURL) + } + + discoveryUrl, ok := e[OIDCDiscoveryBaseURL] + if !ok { + return string(DefaultOIDCDiscoveryBaseURL) + } + + return string(discoveryUrl) +} + func (e Flags) String() string { return fmt.Sprintf("%+v", map[string]Flag(e)) } @@ -183,7 +200,7 @@ func NewFlagsConfigFromMap(data map[string]string) (Flags, error) { flags[sanitizedKey] = AuthorizationDenyAll } else if sanitizedKey == AuthorizationDefaultMode && strings.EqualFold(v, string(AuthorizationAllowSameNamespace)) { flags[sanitizedKey] = AuthorizationAllowSameNamespace - } else if strings.Contains(k, NodeSelectorLabel) { + } else if strings.Contains(k, NodeSelectorLabel) || sanitizedKey == OIDCDiscoveryBaseURL { flags[sanitizedKey] = Flag(v) } else { flags[k] = Flag(v) diff --git a/pkg/apis/feature/features_test.go b/pkg/apis/feature/features_test.go index 5b7caa8404a..34d899d2209 100644 --- a/pkg/apis/feature/features_test.go +++ b/pkg/apis/feature/features_test.go @@ -61,6 +61,8 @@ func TestGetFlags(t *testing.T) { nodeSelector := flags.NodeSelector() expectedNodeSelector := map[string]string{"testkey": "testvalue", "testkey1": "testvalue1", "testkey2": "testvalue2"} require.Equal(t, expectedNodeSelector, nodeSelector) + + require.Equal(t, flags.OIDCDiscoveryBaseURL(), "https://oidc.eks.eu-west-1.amazonaws.com/id/1") } func TestShouldNotOverrideDefaults(t *testing.T) { diff --git a/pkg/apis/feature/flag_names.go b/pkg/apis/feature/flag_names.go index 6a7579fb710..e21056eb444 100644 --- a/pkg/apis/feature/flag_names.go +++ b/pkg/apis/feature/flag_names.go @@ -28,4 +28,5 @@ const ( CrossNamespaceEventLinks = "cross-namespace-event-links" NewAPIServerFilters = "new-apiserversource-filters" AuthorizationDefaultMode = "default-authorization-mode" + OIDCDiscoveryBaseURL = "oidc-discovery-base-url" ) diff --git a/pkg/apis/feature/testdata/config-features.yaml b/pkg/apis/feature/testdata/config-features.yaml index 6c3252ba429..bebbfea4b79 100644 --- a/pkg/apis/feature/testdata/config-features.yaml +++ b/pkg/apis/feature/testdata/config-features.yaml @@ -29,3 +29,4 @@ data: apiserversources-nodeselector-testkey: testvalue apiserversources-nodeselector-testkey1: testvalue1 apiserversources-nodeselector-testkey2: testvalue2 + oidc-discovery-base-url: "https://oidc.eks.eu-west-1.amazonaws.com/id/1" diff --git a/pkg/auth/verifier.go b/pkg/auth/verifier.go index 3cf9c436bc8..82df185fcd9 100644 --- a/pkg/auth/verifier.go +++ b/pkg/auth/verifier.go @@ -41,10 +41,6 @@ import ( "knative.dev/pkg/logging" ) -const ( - kubernetesOIDCDiscoveryBaseURL = "https://kubernetes.default.svc" -) - type Verifier struct { logger *zap.SugaredLogger restConfig *rest.Config @@ -61,14 +57,14 @@ type IDToken struct { AccessTokenHash string } -func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister) *Verifier { +func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, features feature.Flags) *Verifier { tokenHandler := &Verifier{ logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), restConfig: injection.GetConfig(ctx), eventPolicyLister: eventPolicyLister, } - if err := tokenHandler.initOIDCProvider(ctx); err != nil { + if err := tokenHandler.initOIDCProvider(ctx, features); err != nil { tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err)) } @@ -219,13 +215,13 @@ func (v *Verifier) verifyJWT(ctx context.Context, jwt, audience string) (*IDToke }, nil } -func (v *Verifier) initOIDCProvider(ctx context.Context) error { - discovery, err := v.getKubernetesOIDCDiscovery() +func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags) error { + discovery, err := v.getKubernetesOIDCDiscovery(features) if err != nil { return fmt.Errorf("could not load Kubernetes OIDC discovery information: %w", err) } - if discovery.Issuer != kubernetesOIDCDiscoveryBaseURL { + if discovery.Issuer != features.OIDCDiscoveryBaseURL() { // in case we have another issuer as the api server: ctx = oidc.InsecureIssuerURLContext(ctx, discovery.Issuer) } @@ -237,7 +233,7 @@ func (v *Verifier) initOIDCProvider(ctx context.Context) error { ctx = oidc.ClientContext(ctx, httpClient) // get OIDC provider - v.provider, err = oidc.NewProvider(ctx, kubernetesOIDCDiscoveryBaseURL) + v.provider, err = oidc.NewProvider(ctx, features.OIDCDiscoveryBaseURL()) if err != nil { return fmt.Errorf("could not get OIDC provider: %w", err) } @@ -256,13 +252,13 @@ func (v *Verifier) getHTTPClientForKubeAPIServer() (*http.Client, error) { return client, nil } -func (v *Verifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error) { +func (v *Verifier) getKubernetesOIDCDiscovery(features feature.Flags) (*openIDMetadata, error) { client, err := v.getHTTPClientForKubeAPIServer() if err != nil { return nil, fmt.Errorf("could not get HTTP client for API server: %w", err) } - resp, err := client.Get(kubernetesOIDCDiscoveryBaseURL + "/.well-known/openid-configuration") + resp, err := client.Get(features.OIDCDiscoveryBaseURL() + "/.well-known/openid-configuration") if err != nil { return nil, fmt.Errorf("could not get response: %w", err) } diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 08ddf1c1a4f..78fb902c5ef 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -90,7 +90,7 @@ type Handler struct { logger *zap.Logger withContext func(ctx context.Context) context.Context filtersMap *subscriptionsapi.FiltersMap - tokenVerifier *auth.Verifier + TokenVerifier *auth.Verifier EventTypeCreator *eventtype.EventTypeAutoHandler } @@ -153,7 +153,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.Verifier, oidcTokenProvi brokerLister: brokerInformer.Lister(), subscriptionLister: subscriptionInformer.Lister(), logger: logger, - tokenVerifier: tokenVerifier, + TokenVerifier: tokenVerifier, withContext: wc, filtersMap: fm, }, nil @@ -225,7 +225,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } subscriptionFullIdentity := fmt.Sprintf("system:serviceaccount:%s:%s", subscription.Namespace, *subscription.Status.Auth.ServiceAccountName) - err = h.tokenVerifier.VerifyRequestFromSubject(ctx, features, &audience, subscriptionFullIdentity, request, writer) + err = h.TokenVerifier.VerifyRequestFromSubject(ctx, features, &audience, subscriptionFullIdentity, request, writer) if err != nil { h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 6f020d8870d..dd12b93ebfb 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -444,7 +444,7 @@ func TestReceiver(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister()) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), feature.FromContext(ctx)) for _, trig := range tc.triggers { // Replace the SubscriberURI to point at our fake server. @@ -653,7 +653,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister()) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), feature.FromContext(ctx)) // Replace the SubscriberURI to point at our fake server. for _, trig := range tc.triggers { diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index bdb817e6796..c183832b8ba 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -73,7 +73,7 @@ type Handler struct { eventDispatcher *kncloudevents.Dispatcher - tokenVerifier *auth.Verifier + TokenVerifier *auth.Verifier withContext func(ctx context.Context) context.Context } @@ -128,7 +128,7 @@ func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.Eve Logger: logger, BrokerLister: brokerInformer.Lister(), eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider), - tokenVerifier: tokenVerifier, + TokenVerifier: tokenVerifier, withContext: withContext, }, nil } @@ -237,7 +237,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { if broker.Status.Address != nil { audience = broker.Status.Address.Audience } - err = h.tokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer) + err = h.TokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer) if err != nil { h.Logger.Warn("Failed to verify AuthN and AuthZ.", zap.Error(err)) return diff --git a/pkg/broker/ingress/ingress_handler_test.go b/pkg/broker/ingress/ingress_handler_test.go index 37982bd27ac..3956bc6dcd0 100644 --- a/pkg/broker/ingress/ingress_handler_test.go +++ b/pkg/broker/ingress/ingress_handler_test.go @@ -40,6 +40,7 @@ import ( "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/broker" @@ -291,7 +292,7 @@ func TestHandler_ServeHTTP(t *testing.T) { } tokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister()) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), feature.FromContextOrDefaults(ctx)) h, err := NewHandler(logger, &mockReporter{}, diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index a919ef7a2d7..2c53e2ed9a3 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -130,6 +130,15 @@ func NewController( TrustBundleConfigMapLister: trustBundleConfigMapInformer.Lister().ConfigMaps(system.Namespace()), } + var globalResync func(obj interface{}) + + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(_ string, _ interface{}) { + if globalResync != nil { + globalResync(nil) + } + }) + + featureStore.WatchConfigs(cmw) r := &Reconciler{ multiChannelEventHandler: sh, reporter: reporter, @@ -137,25 +146,17 @@ func NewController( eventingClient: eventingclient.Get(ctx).EventingV1beta2(), eventTypeLister: eventtypeinformer.Get(ctx).Lister(), eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider), - authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister()), + authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()), clientConfig: clientConfig, inMemoryChannelLister: inmemorychannelInformer.Lister(), } - var globalResync func(obj interface{}) - - featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(_ string, _ interface{}) { - if globalResync != nil { - globalResync(nil) - } - }) - featureStore.WatchConfigs(cmw) - impl := inmemorychannelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { return controller.Options{SkipStatusUpdates: true, FinalizerName: finalizerName, ConfigStore: featureStore} }) globalResync = func(_ interface{}) { + r.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()) impl.GlobalResync(inmemorychannelInformer.Informer()) } From eee13207e67235355d6ad3c2674970cedac9f2d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 17 Sep 2024 16:00:16 +0200 Subject: [PATCH 2/5] Use Trustbundles and public CAs, when issuer URL is not kubernetes.default.svc --- cmd/broker/filter/main.go | 9 +-- cmd/broker/ingress/main.go | 9 +-- cmd/jobsink/main.go | 12 +++- pkg/auth/verifier.go | 69 ++++++++++++++----- pkg/broker/filter/filter_handler_test.go | 21 ++++-- pkg/broker/ingress/ingress_handler_test.go | 16 ++++- .../inmemorychannel/dispatcher/controller.go | 8 +-- 7 files changed, 107 insertions(+), 37 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 42b86389579..075ec8ba29d 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -120,6 +120,8 @@ func main() { // Watch the observability config map and dynamically update request logs. configMapWatcher.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(sl, atomicLevel, component)) + trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) + var featureStore *feature.Store var handler *filter.Handler @@ -134,7 +136,7 @@ func main() { } handler.EventTypeCreator = autoCreate } - handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureFlags) + handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureFlags) }) featureStore.WatchConfigs(configMapWatcher) @@ -154,9 +156,8 @@ func main() { oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()) - trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) - handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()) + handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapLister, ctxFunc) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) } diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 7192f2df967..db5ddb754fd 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -143,6 +143,8 @@ func main() { logger.Fatal("Error setting up trace publishing", zap.Error(err)) } + trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) + var featureStore *feature.Store var handler *ingress.Handler @@ -157,7 +159,7 @@ func main() { } handler.EvenTypeHandler = autoCreate } - handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureFlags) + handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureFlags) }) featureStore.WatchConfigs(configMapWatcher) @@ -169,9 +171,8 @@ func main() { reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()) - trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) - handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapInformer, ctxFunc) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()) + handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapLister, ctxFunc) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) } diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index 203f257c7b2..1c953b50a5e 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -25,6 +25,9 @@ import ( "net/http" "strings" + configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered" + filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + "github.com/cloudevents/sdk-go/v2/binding" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" "go.uber.org/zap" @@ -70,9 +73,13 @@ func main() { cfg := injection.ParseAndGetRESTConfigOrDie() ctx = injection.WithConfig(ctx, cfg) + ctx = filteredFactory.WithSelectors(ctx, + eventingtls.TrustBundleLabelSelector, + ) ctx, informers := injection.Default.SetupInformers(ctx, cfg) ctx = injection.WithConfig(ctx, cfg) + loggingConfig, err := cmdbroker.GetLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName()) if err != nil { log.Fatal("Error loading/parsing logging configuration:", err) @@ -104,12 +111,13 @@ func main() { logger.Info("Starting the JobSink Ingress") + trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) var h *Handler featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { logger.Info("Updated", zap.String("name", name), zap.Any("value", value)) if flags, ok := value.(feature.Flags); ok && h != nil { - h.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), flags) + h.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, flags) } }) featureStore.WatchConfigs(configMapWatcher) @@ -123,7 +131,7 @@ func main() { k8s: kubeclient.Get(ctx), lister: jobsink.Get(ctx).Lister(), withContext: ctxFunc, - authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()), + authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()), } tlsConfig, err := getServerTLSConfig(ctx) diff --git a/pkg/auth/verifier.go b/pkg/auth/verifier.go index 82df185fcd9..85027eac04b 100644 --- a/pkg/auth/verifier.go +++ b/pkg/auth/verifier.go @@ -22,10 +22,17 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" "strings" "time" + "go.opencensus.io/plugin/ochttp" + corev1listers "k8s.io/client-go/listers/core/v1" + "knative.dev/eventing/pkg/eventingtls" + "knative.dev/pkg/network" + "knative.dev/pkg/tracing/propagation/tracecontextb3" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" @@ -42,10 +49,11 @@ import ( ) type Verifier struct { - logger *zap.SugaredLogger - restConfig *rest.Config - provider *oidc.Provider - eventPolicyLister v1alpha1.EventPolicyLister + logger *zap.SugaredLogger + restConfig *rest.Config + provider *oidc.Provider + eventPolicyLister v1alpha1.EventPolicyLister + trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister } type IDToken struct { @@ -57,11 +65,12 @@ type IDToken struct { AccessTokenHash string } -func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, features feature.Flags) *Verifier { +func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, features feature.Flags) *Verifier { tokenHandler := &Verifier{ - logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), - restConfig: injection.GetConfig(ctx), - eventPolicyLister: eventPolicyLister, + logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), + restConfig: injection.GetConfig(ctx), + eventPolicyLister: eventPolicyLister, + trustBundleConfigMapLister: trustBundleConfigMapLister, } if err := tokenHandler.initOIDCProvider(ctx, features); err != nil { @@ -216,7 +225,12 @@ func (v *Verifier) verifyJWT(ctx context.Context, jwt, audience string) (*IDToke } func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags) error { - discovery, err := v.getKubernetesOIDCDiscovery(features) + httpClient, err := v.getHTTPClient(features) + if err != nil { + return fmt.Errorf("could not get HTTP client: %w", err) + } + + discovery, err := v.getKubernetesOIDCDiscovery(features, httpClient) if err != nil { return fmt.Errorf("could not load Kubernetes OIDC discovery information: %w", err) } @@ -226,10 +240,6 @@ func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags) ctx = oidc.InsecureIssuerURLContext(ctx, discovery.Issuer) } - httpClient, err := v.getHTTPClientForKubeAPIServer() - if err != nil { - return fmt.Errorf("could not get HTTP client with TLS certs of API server: %w", err) - } ctx = oidc.ClientContext(ctx, httpClient) // get OIDC provider @@ -252,12 +262,37 @@ func (v *Verifier) getHTTPClientForKubeAPIServer() (*http.Client, error) { return client, nil } -func (v *Verifier) getKubernetesOIDCDiscovery(features feature.Flags) (*openIDMetadata, error) { - client, err := v.getHTTPClientForKubeAPIServer() - if err != nil { - return nil, fmt.Errorf("could not get HTTP client for API server: %w", err) +func (v *Verifier) getHTTPClient(features feature.Flags) (*http.Client, error) { + if features.OIDCDiscoveryBaseURL() == "https://kubernetes.default.svc" { + return v.getHTTPClientForKubeAPIServer() + } + + var base = http.DefaultTransport.(*http.Transport).Clone() + + clientConfig := eventingtls.ClientConfig{ + TrustBundleConfigMapLister: v.trustBundleConfigMapLister, } + base.DialTLSContext = func(ctx context.Context, net, addr string) (net.Conn, error) { + tlsConfig, err := eventingtls.GetTLSClientConfig(clientConfig) + if err != nil { + return nil, fmt.Errorf("could not get tls client config: %w", err) + } + return network.DialTLSWithBackOff(ctx, net, addr, tlsConfig) + } + + client := &http.Client{ + // Add output tracing. + Transport: &ochttp.Transport{ + Base: base, + Propagation: tracecontextb3.TraceContextEgress, + }, + } + + return client, nil +} + +func (v *Verifier) getKubernetesOIDCDiscovery(features feature.Flags, client *http.Client) (*openIDMetadata, error) { resp, err := client.Get(features.OIDCDiscoveryBaseURL() + "/.well-known/openid-configuration") if err != nil { return nil, fmt.Errorf("could not get response: %w", err) diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index dd12b93ebfb..4d1852002ac 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -27,6 +27,10 @@ import ( "testing" "time" + "knative.dev/eventing/pkg/eventingtls" + filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + "knative.dev/pkg/system" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/reconciler/broker/resources" @@ -42,6 +46,7 @@ import ( "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/apis" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" + filteredconfigmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered/fake" "knative.dev/pkg/logging" reconcilertesting "knative.dev/pkg/reconciler/testing" @@ -60,6 +65,7 @@ import ( // Fake injection client _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/pkg/client/injection/kube/client/fake" + _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" ) const ( @@ -425,10 +431,11 @@ func TestReceiver(t *testing.T) { } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector) ctx = feature.ToContext(ctx, feature.Flags{ feature.OIDCAuthentication: feature.Enabled, }) + trustBundleConfigMapLister := filteredconfigmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) fh := fakeHandler{ failRequest: tc.requestFails, @@ -444,7 +451,7 @@ func TestReceiver(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), feature.FromContext(ctx)) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContext(ctx)) for _, trig := range tc.triggers { // Replace the SubscriberURI to point at our fake server. @@ -641,7 +648,8 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector) + trustBundleConfigMapLister := filteredconfigmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) fh := fakeHandler{ t: t, @@ -653,7 +661,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), feature.FromContext(ctx)) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContext(ctx)) // Replace the SubscriberURI to point at our fake server. for _, trig := range tc.triggers { @@ -989,3 +997,8 @@ func makeEmptyResponse(status int) *http.Response { } return r } + +func SetUpInformerSelector(ctx context.Context) context.Context { + ctx = filteredFactory.WithSelectors(ctx, eventingtls.TrustBundleLabelSelector) + return ctx +} diff --git a/pkg/broker/ingress/ingress_handler_test.go b/pkg/broker/ingress/ingress_handler_test.go index 3956bc6dcd0..991566e2182 100644 --- a/pkg/broker/ingress/ingress_handler_test.go +++ b/pkg/broker/ingress/ingress_handler_test.go @@ -26,6 +26,11 @@ import ( "testing" "time" + "knative.dev/eventing/pkg/eventingtls" + filteredconfigmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered/fake" + filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + "knative.dev/pkg/system" + "github.com/cloudevents/sdk-go/v2/client" "github.com/cloudevents/sdk-go/v2/event" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" @@ -50,6 +55,7 @@ import ( // Fake injection client _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/pkg/client/injection/kube/client/fake" + _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" ) const ( @@ -263,7 +269,8 @@ func TestHandler_ServeHTTP(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - ctx, _ := reconcilertesting.SetupFakeContext(t) + ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector) + trustBundleConfigMapLister := filteredconfigmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) s := httptest.NewServer(tc.handler) defer s.Close() @@ -292,7 +299,7 @@ func TestHandler_ServeHTTP(t *testing.T) { } tokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), feature.FromContextOrDefaults(ctx)) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContextOrDefaults(ctx)) h, err := NewHandler(logger, &mockReporter{}, @@ -402,3 +409,8 @@ func withUninitializedAnnotations(b *eventingv1.Broker) *eventingv1.Broker { b.Status.Annotations = nil return b } + +func SetUpInformerSelector(ctx context.Context) context.Context { + ctx = filteredFactory.WithSelectors(ctx, eventingtls.TrustBundleLabelSelector) + return ctx +} diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index 2c53e2ed9a3..8ee3af7da34 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -88,7 +88,7 @@ func NewController( ) *controller.Impl { logger := logging.FromContext(ctx) - trustBundleConfigMapInformer := filteredconfigmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector) + trustBundleConfigMapLister := filteredconfigmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) // Setup trace publishing. iw := cmw.(*configmapinformer.InformedWatcher) @@ -127,7 +127,7 @@ func NewController( oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) clientConfig := eventingtls.ClientConfig{ - TrustBundleConfigMapLister: trustBundleConfigMapInformer.Lister().ConfigMaps(system.Namespace()), + TrustBundleConfigMapLister: trustBundleConfigMapLister, } var globalResync func(obj interface{}) @@ -146,7 +146,7 @@ func NewController( eventingClient: eventingclient.Get(ctx).EventingV1beta2(), eventTypeLister: eventtypeinformer.Get(ctx).Lister(), eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider), - authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()), + authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()), clientConfig: clientConfig, inMemoryChannelLister: inmemorychannelInformer.Lister(), } @@ -156,7 +156,7 @@ func NewController( }) globalResync = func(_ interface{}) { - r.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()) + r.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()) impl.GlobalResync(inmemorychannelInformer.Informer()) } From 4bd5aef19941b9c462786aeced693cbc2b0b1d3b Mon Sep 17 00:00:00 2001 From: Cali0707 Date: Sun, 22 Sep 2024 20:21:57 -0400 Subject: [PATCH 3/5] refactor: pass configmap watcher to auth verifier constructor instead of feature flags Signed-off-by: Cali0707 --- cmd/broker/filter/main.go | 3 +-- cmd/broker/ingress/main.go | 3 +-- cmd/jobsink/main.go | 9 ++----- pkg/auth/verifier.go | 19 +++++++++++--- pkg/broker/filter/filter_handler.go | 6 ++--- pkg/broker/filter/filter_handler_test.go | 26 +++++++++++++++++-- pkg/broker/ingress/ingress_handler.go | 6 ++--- pkg/broker/ingress/ingress_handler_test.go | 14 +++++++++- .../inmemorychannel/dispatcher/controller.go | 3 +-- 9 files changed, 64 insertions(+), 25 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 075ec8ba29d..e491670f934 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -136,7 +136,6 @@ func main() { } handler.EventTypeCreator = autoCreate } - handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureFlags) }) featureStore.WatchConfigs(configMapWatcher) @@ -156,7 +155,7 @@ func main() { oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher) handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapLister, ctxFunc) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index db5ddb754fd..bd8e76d8fea 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -159,7 +159,6 @@ func main() { } handler.EvenTypeHandler = autoCreate } - handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureFlags) }) featureStore.WatchConfigs(configMapWatcher) @@ -171,7 +170,7 @@ func main() { reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher) handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapLister, ctxFunc) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index 1c953b50a5e..a79cf5d7655 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -114,12 +114,7 @@ func main() { trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) var h *Handler - featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { - logger.Info("Updated", zap.String("name", name), zap.Any("value", value)) - if flags, ok := value.(feature.Flags); ok && h != nil { - h.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, flags) - } - }) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) featureStore.WatchConfigs(configMapWatcher) // Decorate contexts with the current state of the feature config. @@ -131,7 +126,7 @@ func main() { k8s: kubeclient.Get(ctx), lister: jobsink.Get(ctx).Lister(), withContext: ctxFunc, - authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()), + authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher), } tlsConfig, err := getServerTLSConfig(ctx) diff --git a/pkg/auth/verifier.go b/pkg/auth/verifier.go index 85027eac04b..c76c9df8b8d 100644 --- a/pkg/auth/verifier.go +++ b/pkg/auth/verifier.go @@ -30,6 +30,7 @@ import ( "go.opencensus.io/plugin/ochttp" corev1listers "k8s.io/client-go/listers/core/v1" "knative.dev/eventing/pkg/eventingtls" + "knative.dev/pkg/configmap" "knative.dev/pkg/network" "knative.dev/pkg/tracing/propagation/tracecontextb3" @@ -65,7 +66,7 @@ type IDToken struct { AccessTokenHash string } -func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, features feature.Flags) *Verifier { +func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, cmw configmap.Watcher) *Verifier { tokenHandler := &Verifier{ logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), restConfig: injection.GetConfig(ctx), @@ -73,7 +74,16 @@ func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1. trustBundleConfigMapLister: trustBundleConfigMapLister, } - if err := tokenHandler.initOIDCProvider(ctx, features); err != nil { + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if features, ok := value.(feature.Flags); ok { + if err := tokenHandler.initOIDCProvider(ctx, features); err != nil { + tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider after config update. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err)) + } + } + }) + featureStore.WatchConfigs(cmw) + + if err := tokenHandler.initOIDCProvider(ctx, featureStore.Load()); err != nil { tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err)) } @@ -243,11 +253,14 @@ func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags) ctx = oidc.ClientContext(ctx, httpClient) // get OIDC provider - v.provider, err = oidc.NewProvider(ctx, features.OIDCDiscoveryBaseURL()) + provider, err := oidc.NewProvider(ctx, features.OIDCDiscoveryBaseURL()) if err != nil { return fmt.Errorf("could not get OIDC provider: %w", err) } + // provider is valid, update it + v.provider = provider + v.logger.Debug("updated OIDC provider config", zap.Any("discovery-config", discovery)) return nil diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 78fb902c5ef..08ddf1c1a4f 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -90,7 +90,7 @@ type Handler struct { logger *zap.Logger withContext func(ctx context.Context) context.Context filtersMap *subscriptionsapi.FiltersMap - TokenVerifier *auth.Verifier + tokenVerifier *auth.Verifier EventTypeCreator *eventtype.EventTypeAutoHandler } @@ -153,7 +153,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.Verifier, oidcTokenProvi brokerLister: brokerInformer.Lister(), subscriptionLister: subscriptionInformer.Lister(), logger: logger, - TokenVerifier: tokenVerifier, + tokenVerifier: tokenVerifier, withContext: wc, filtersMap: fm, }, nil @@ -225,7 +225,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } subscriptionFullIdentity := fmt.Sprintf("system:serviceaccount:%s:%s", subscription.Namespace, *subscription.Status.Auth.ServiceAccountName) - err = h.TokenVerifier.VerifyRequestFromSubject(ctx, features, &audience, subscriptionFullIdentity, request, writer) + err = h.tokenVerifier.VerifyRequestFromSubject(ctx, features, &audience, subscriptionFullIdentity, request, writer) if err != nil { h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 4d1852002ac..b15f2fde3df 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -29,6 +29,7 @@ import ( "knative.dev/eventing/pkg/eventingtls" filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + "knative.dev/pkg/configmap" "knative.dev/pkg/system" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" @@ -42,6 +43,7 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zaptest" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/apis" @@ -451,7 +453,17 @@ func TestReceiver(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContext(ctx)) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-features", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + feature.OIDCAuthentication: string(feature.Enabled), + }, + }, + )) for _, trig := range tc.triggers { // Replace the SubscriberURI to point at our fake server. @@ -661,7 +673,17 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContext(ctx)) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-features", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + feature.OIDCAuthentication: string(feature.Enabled), + }, + }, + )) // Replace the SubscriberURI to point at our fake server. for _, trig := range tc.triggers { diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index c183832b8ba..bdb817e6796 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -73,7 +73,7 @@ type Handler struct { eventDispatcher *kncloudevents.Dispatcher - TokenVerifier *auth.Verifier + tokenVerifier *auth.Verifier withContext func(ctx context.Context) context.Context } @@ -128,7 +128,7 @@ func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.Eve Logger: logger, BrokerLister: brokerInformer.Lister(), eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider), - TokenVerifier: tokenVerifier, + tokenVerifier: tokenVerifier, withContext: withContext, }, nil } @@ -237,7 +237,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { if broker.Status.Address != nil { audience = broker.Status.Address.Audience } - err = h.TokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer) + err = h.tokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer) if err != nil { h.Logger.Warn("Failed to verify AuthN and AuthZ.", zap.Error(err)) return diff --git a/pkg/broker/ingress/ingress_handler_test.go b/pkg/broker/ingress/ingress_handler_test.go index 991566e2182..f30174c1476 100644 --- a/pkg/broker/ingress/ingress_handler_test.go +++ b/pkg/broker/ingress/ingress_handler_test.go @@ -36,10 +36,12 @@ import ( cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/google/go-cmp/cmp" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/configmap" reconcilertesting "knative.dev/pkg/reconciler/testing" @@ -299,7 +301,17 @@ func TestHandler_ServeHTTP(t *testing.T) { } tokenProvider := auth.NewOIDCTokenProvider(ctx) - authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContextOrDefaults(ctx)) + authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-features", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + feature.OIDCAuthentication: string(feature.Enabled), + }, + }, + )) h, err := NewHandler(logger, &mockReporter{}, diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index 8ee3af7da34..2eac2c0771c 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -146,7 +146,7 @@ func NewController( eventingClient: eventingclient.Get(ctx).EventingV1beta2(), eventTypeLister: eventtypeinformer.Get(ctx).Lister(), eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider), - authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()), + authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, cmw), clientConfig: clientConfig, inMemoryChannelLister: inmemorychannelInformer.Lister(), } @@ -156,7 +156,6 @@ func NewController( }) globalResync = func(_ interface{}) { - r.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()) impl.GlobalResync(inmemorychannelInformer.Informer()) } From 057c69d9c86a82aad4227f214388ba93fe2ccc81 Mon Sep 17 00:00:00 2001 From: Cali0707 Date: Sat, 5 Oct 2024 12:23:12 -0400 Subject: [PATCH 4/5] feat: add rw lock for provider access Signed-off-by: Cali0707 --- pkg/auth/verifier.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/auth/verifier.go b/pkg/auth/verifier.go index c76c9df8b8d..1474af42d23 100644 --- a/pkg/auth/verifier.go +++ b/pkg/auth/verifier.go @@ -25,6 +25,7 @@ import ( "net" "net/http" "strings" + "sync" "time" "go.opencensus.io/plugin/ochttp" @@ -52,9 +53,10 @@ import ( type Verifier struct { logger *zap.SugaredLogger restConfig *rest.Config - provider *oidc.Provider eventPolicyLister v1alpha1.EventPolicyLister trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister + m *sync.RWMutex + provider *oidc.Provider } type IDToken struct { @@ -211,6 +213,9 @@ func (v *Verifier) verifyAuthZ(ctx context.Context, features feature.Flags, idTo // verifyJWT verifies the given JWT for the expected audience and returns the parsed ID token. func (v *Verifier) verifyJWT(ctx context.Context, jwt, audience string) (*IDToken, error) { + v.m.RLock() + defer v.m.RUnlock() + if v.provider == nil { return nil, fmt.Errorf("provider is nil. Is the OIDC provider config correct?") } @@ -259,6 +264,8 @@ func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags) } // provider is valid, update it + v.m.Lock() + defer v.m.Unlock() v.provider = provider v.logger.Debug("updated OIDC provider config", zap.Any("discovery-config", discovery)) From e5b9e23f8ef2abe7eae5e8b041820445cbb9799f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Wed, 9 Oct 2024 16:05:40 +0200 Subject: [PATCH 5/5] Fix verifier rwmutex --- pkg/auth/verifier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/auth/verifier.go b/pkg/auth/verifier.go index 1474af42d23..df5c9d402df 100644 --- a/pkg/auth/verifier.go +++ b/pkg/auth/verifier.go @@ -55,7 +55,7 @@ type Verifier struct { restConfig *rest.Config eventPolicyLister v1alpha1.EventPolicyLister trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister - m *sync.RWMutex + m sync.RWMutex provider *oidc.Provider }