Skip to content

Commit

Permalink
refactor: pass configmap watcher to auth verifier constructor instead…
Browse files Browse the repository at this point in the history
… of feature flags

Signed-off-by: Cali0707 <[email protected]>
  • Loading branch information
Cali0707 committed Sep 23, 2024
1 parent eee1320 commit 4bd5aef
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 25 deletions.
3 changes: 1 addition & 2 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func main() {
}
handler.EventTypeCreator = autoCreate
}
handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureFlags)
})
featureStore.WatchConfigs(configMapWatcher)

Expand All @@ -156,7 +155,7 @@ func main() {
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load())
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher)
handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapLister, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
Expand Down
3 changes: 1 addition & 2 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func main() {
}
handler.EvenTypeHandler = autoCreate
}
handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureFlags)
})
featureStore.WatchConfigs(configMapWatcher)

Expand All @@ -171,7 +170,7 @@ func main() {
reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load())
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher)
handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapLister, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
Expand Down
9 changes: 2 additions & 7 deletions cmd/jobsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,7 @@ func main() {
trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
var h *Handler

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
logger.Info("Updated", zap.String("name", name), zap.Any("value", value))
if flags, ok := value.(feature.Flags); ok && h != nil {
h.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, flags)
}
})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(configMapWatcher)

// Decorate contexts with the current state of the feature config.
Expand All @@ -131,7 +126,7 @@ func main() {
k8s: kubeclient.Get(ctx),
lister: jobsink.Get(ctx).Lister(),
withContext: ctxFunc,
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()),
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher),
}

tlsConfig, err := getServerTLSConfig(ctx)
Expand Down
19 changes: 16 additions & 3 deletions pkg/auth/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opencensus.io/plugin/ochttp"
corev1listers "k8s.io/client-go/listers/core/v1"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/pkg/configmap"
"knative.dev/pkg/network"
"knative.dev/pkg/tracing/propagation/tracecontextb3"

Expand Down Expand Up @@ -65,15 +66,24 @@ type IDToken struct {
AccessTokenHash string
}

func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, features feature.Flags) *Verifier {
func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, cmw configmap.Watcher) *Verifier {
tokenHandler := &Verifier{
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
eventPolicyLister: eventPolicyLister,
trustBundleConfigMapLister: trustBundleConfigMapLister,
}

if err := tokenHandler.initOIDCProvider(ctx, features); err != nil {
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if features, ok := value.(feature.Flags); ok {
if err := tokenHandler.initOIDCProvider(ctx, features); err != nil {
tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider after config update. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err))
}
}
})
featureStore.WatchConfigs(cmw)

if err := tokenHandler.initOIDCProvider(ctx, featureStore.Load()); err != nil {
tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err))
}

Expand Down Expand Up @@ -243,11 +253,14 @@ func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags)
ctx = oidc.ClientContext(ctx, httpClient)

// get OIDC provider
v.provider, err = oidc.NewProvider(ctx, features.OIDCDiscoveryBaseURL())
provider, err := oidc.NewProvider(ctx, features.OIDCDiscoveryBaseURL())
if err != nil {
return fmt.Errorf("could not get OIDC provider: %w", err)
}

// provider is valid, update it
v.provider = provider

v.logger.Debug("updated OIDC provider config", zap.Any("discovery-config", discovery))

return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Handler struct {
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
TokenVerifier *auth.Verifier
tokenVerifier *auth.Verifier
EventTypeCreator *eventtype.EventTypeAutoHandler
}

Expand Down Expand Up @@ -153,7 +153,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.Verifier, oidcTokenProvi
brokerLister: brokerInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
logger: logger,
TokenVerifier: tokenVerifier,
tokenVerifier: tokenVerifier,
withContext: wc,
filtersMap: fm,
}, nil
Expand Down Expand Up @@ -225,7 +225,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
}

subscriptionFullIdentity := fmt.Sprintf("system:serviceaccount:%s:%s", subscription.Namespace, *subscription.Status.Auth.ServiceAccountName)
err = h.TokenVerifier.VerifyRequestFromSubject(ctx, features, &audience, subscriptionFullIdentity, request, writer)
err = h.tokenVerifier.VerifyRequestFromSubject(ctx, features, &audience, subscriptionFullIdentity, request, writer)
if err != nil {
h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
Expand Down
26 changes: 24 additions & 2 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"knative.dev/eventing/pkg/eventingtls"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/system"

messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
Expand All @@ -42,6 +43,7 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -451,7 +453,17 @@ func TestReceiver(t *testing.T) {

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContext(ctx))
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
Namespace: "knative-eventing",
},
Data: map[string]string{
feature.OIDCAuthentication: string(feature.Enabled),
},
},
))

for _, trig := range tc.triggers {
// Replace the SubscriberURI to point at our fake server.
Expand Down Expand Up @@ -661,7 +673,17 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContext(ctx))
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
Namespace: "knative-eventing",
},
Data: map[string]string{
feature.OIDCAuthentication: string(feature.Enabled),
},
},
))

// Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
Expand Down
6 changes: 3 additions & 3 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Handler struct {

eventDispatcher *kncloudevents.Dispatcher

TokenVerifier *auth.Verifier
tokenVerifier *auth.Verifier

withContext func(ctx context.Context) context.Context
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.Eve
Logger: logger,
BrokerLister: brokerInformer.Lister(),
eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider),
TokenVerifier: tokenVerifier,
tokenVerifier: tokenVerifier,
withContext: withContext,
}, nil
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if broker.Status.Address != nil {
audience = broker.Status.Address.Audience
}
err = h.TokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer)
err = h.tokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer)
if err != nil {
h.Logger.Warn("Failed to verify AuthN and AuthZ.", zap.Error(err))
return
Expand Down
14 changes: 13 additions & 1 deletion pkg/broker/ingress/ingress_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ import (
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake"

duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/configmap"

reconcilertesting "knative.dev/pkg/reconciler/testing"

Expand Down Expand Up @@ -299,7 +301,17 @@ func TestHandler_ServeHTTP(t *testing.T) {
}

tokenProvider := auth.NewOIDCTokenProvider(ctx)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContextOrDefaults(ctx))
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
Namespace: "knative-eventing",
},
Data: map[string]string{
feature.OIDCAuthentication: string(feature.Enabled),
},
},
))

h, err := NewHandler(logger,
&mockReporter{},
Expand Down
3 changes: 1 addition & 2 deletions pkg/reconciler/inmemorychannel/dispatcher/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewController(
eventingClient: eventingclient.Get(ctx).EventingV1beta2(),
eventTypeLister: eventtypeinformer.Get(ctx).Lister(),
eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider),
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()),
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, cmw),
clientConfig: clientConfig,
inMemoryChannelLister: inmemorychannelInformer.Lister(),
}
Expand All @@ -156,7 +156,6 @@ func NewController(
})

globalResync = func(_ interface{}) {
r.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load())
impl.GlobalResync(inmemorychannelInformer.Informer())
}

Expand Down

0 comments on commit 4bd5aef

Please sign in to comment.