Skip to content

Commit

Permalink
Use Trustbundles and public CAs, when issuer URL is not kubernetes.de…
Browse files Browse the repository at this point in the history
…fault.svc
  • Loading branch information
creydr committed Sep 17, 2024
1 parent 837d926 commit eee1320
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 37 deletions.
9 changes: 5 additions & 4 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func main() {
// Watch the observability config map and dynamically update request logs.
configMapWatcher.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(sl, atomicLevel, component))

trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

var featureStore *feature.Store
var handler *filter.Handler

Expand All @@ -134,7 +136,7 @@ func main() {
}
handler.EventTypeCreator = autoCreate
}
handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureFlags)
handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureFlags)
})
featureStore.WatchConfigs(configMapWatcher)

Expand All @@ -154,9 +156,8 @@ func main() {
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load())
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load())
handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapLister, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func main() {
logger.Fatal("Error setting up trace publishing", zap.Error(err))
}

trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

var featureStore *feature.Store
var handler *ingress.Handler

Expand All @@ -157,7 +159,7 @@ func main() {
}
handler.EvenTypeHandler = autoCreate
}
handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureFlags)
handler.TokenVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureFlags)
})
featureStore.WatchConfigs(configMapWatcher)

Expand All @@ -169,9 +171,8 @@ func main() {
reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load())
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapInformer, ctxFunc)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load())
handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapLister, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/jobsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"net/http"
"strings"

configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"

"github.com/cloudevents/sdk-go/v2/binding"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"go.uber.org/zap"
Expand Down Expand Up @@ -70,9 +73,13 @@ func main() {

cfg := injection.ParseAndGetRESTConfigOrDie()
ctx = injection.WithConfig(ctx, cfg)
ctx = filteredFactory.WithSelectors(ctx,
eventingtls.TrustBundleLabelSelector,
)

ctx, informers := injection.Default.SetupInformers(ctx, cfg)
ctx = injection.WithConfig(ctx, cfg)

loggingConfig, err := cmdbroker.GetLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName())
if err != nil {
log.Fatal("Error loading/parsing logging configuration:", err)
Expand Down Expand Up @@ -104,12 +111,13 @@ func main() {

logger.Info("Starting the JobSink Ingress")

trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
var h *Handler

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
logger.Info("Updated", zap.String("name", name), zap.Any("value", value))
if flags, ok := value.(feature.Flags); ok && h != nil {
h.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), flags)
h.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, flags)
}
})
featureStore.WatchConfigs(configMapWatcher)
Expand All @@ -123,7 +131,7 @@ func main() {
k8s: kubeclient.Get(ctx),
lister: jobsink.Get(ctx).Lister(),
withContext: ctxFunc,
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()),
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()),
}

tlsConfig, err := getServerTLSConfig(ctx)
Expand Down
69 changes: 52 additions & 17 deletions pkg/auth/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"

"go.opencensus.io/plugin/ochttp"
corev1listers "k8s.io/client-go/listers/core/v1"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/pkg/network"
"knative.dev/pkg/tracing/propagation/tracecontextb3"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"

Expand All @@ -42,10 +49,11 @@ import (
)

type Verifier struct {
logger *zap.SugaredLogger
restConfig *rest.Config
provider *oidc.Provider
eventPolicyLister v1alpha1.EventPolicyLister
logger *zap.SugaredLogger
restConfig *rest.Config
provider *oidc.Provider
eventPolicyLister v1alpha1.EventPolicyLister
trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister
}

type IDToken struct {
Expand All @@ -57,11 +65,12 @@ type IDToken struct {
AccessTokenHash string
}

func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, features feature.Flags) *Verifier {
func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, features feature.Flags) *Verifier {
tokenHandler := &Verifier{
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
eventPolicyLister: eventPolicyLister,
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
eventPolicyLister: eventPolicyLister,
trustBundleConfigMapLister: trustBundleConfigMapLister,
}

if err := tokenHandler.initOIDCProvider(ctx, features); err != nil {
Expand Down Expand Up @@ -216,7 +225,12 @@ func (v *Verifier) verifyJWT(ctx context.Context, jwt, audience string) (*IDToke
}

func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags) error {
discovery, err := v.getKubernetesOIDCDiscovery(features)
httpClient, err := v.getHTTPClient(features)
if err != nil {
return fmt.Errorf("could not get HTTP client: %w", err)
}

discovery, err := v.getKubernetesOIDCDiscovery(features, httpClient)
if err != nil {
return fmt.Errorf("could not load Kubernetes OIDC discovery information: %w", err)
}
Expand All @@ -226,10 +240,6 @@ func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags)
ctx = oidc.InsecureIssuerURLContext(ctx, discovery.Issuer)
}

httpClient, err := v.getHTTPClientForKubeAPIServer()
if err != nil {
return fmt.Errorf("could not get HTTP client with TLS certs of API server: %w", err)
}
ctx = oidc.ClientContext(ctx, httpClient)

// get OIDC provider
Expand All @@ -252,12 +262,37 @@ func (v *Verifier) getHTTPClientForKubeAPIServer() (*http.Client, error) {
return client, nil
}

func (v *Verifier) getKubernetesOIDCDiscovery(features feature.Flags) (*openIDMetadata, error) {
client, err := v.getHTTPClientForKubeAPIServer()
if err != nil {
return nil, fmt.Errorf("could not get HTTP client for API server: %w", err)
func (v *Verifier) getHTTPClient(features feature.Flags) (*http.Client, error) {
if features.OIDCDiscoveryBaseURL() == "https://kubernetes.default.svc" {
return v.getHTTPClientForKubeAPIServer()
}

var base = http.DefaultTransport.(*http.Transport).Clone()

clientConfig := eventingtls.ClientConfig{
TrustBundleConfigMapLister: v.trustBundleConfigMapLister,
}

base.DialTLSContext = func(ctx context.Context, net, addr string) (net.Conn, error) {
tlsConfig, err := eventingtls.GetTLSClientConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("could not get tls client config: %w", err)
}
return network.DialTLSWithBackOff(ctx, net, addr, tlsConfig)
}

client := &http.Client{
// Add output tracing.
Transport: &ochttp.Transport{
Base: base,
Propagation: tracecontextb3.TraceContextEgress,
},
}

return client, nil
}

func (v *Verifier) getKubernetesOIDCDiscovery(features feature.Flags, client *http.Client) (*openIDMetadata, error) {
resp, err := client.Get(features.OIDCDiscoveryBaseURL() + "/.well-known/openid-configuration")
if err != nil {
return nil, fmt.Errorf("could not get response: %w", err)
Expand Down
21 changes: 17 additions & 4 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"testing"
"time"

"knative.dev/eventing/pkg/eventingtls"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/system"

messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/reconciler/broker/resources"

Expand All @@ -42,6 +46,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake"
filteredconfigmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered/fake"
"knative.dev/pkg/logging"
reconcilertesting "knative.dev/pkg/reconciler/testing"

Expand All @@ -60,6 +65,7 @@ import (
// Fake injection client
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"
)

const (
Expand Down Expand Up @@ -425,10 +431,11 @@ func TestReceiver(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)
ctx = feature.ToContext(ctx, feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
})
trustBundleConfigMapLister := filteredconfigmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

fh := fakeHandler{
failRequest: tc.requestFails,
Expand All @@ -444,7 +451,7 @@ func TestReceiver(t *testing.T) {

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), feature.FromContext(ctx))
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContext(ctx))

for _, trig := range tc.triggers {
// Replace the SubscriberURI to point at our fake server.
Expand Down Expand Up @@ -641,7 +648,8 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)
trustBundleConfigMapLister := filteredconfigmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

fh := fakeHandler{
t: t,
Expand All @@ -653,7 +661,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), feature.FromContext(ctx))
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContext(ctx))

// Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
Expand Down Expand Up @@ -989,3 +997,8 @@ func makeEmptyResponse(status int) *http.Response {
}
return r
}

func SetUpInformerSelector(ctx context.Context) context.Context {
ctx = filteredFactory.WithSelectors(ctx, eventingtls.TrustBundleLabelSelector)
return ctx
}
16 changes: 14 additions & 2 deletions pkg/broker/ingress/ingress_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"testing"
"time"

"knative.dev/eventing/pkg/eventingtls"
filteredconfigmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered/fake"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/system"

"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
Expand All @@ -50,6 +55,7 @@ import (
// Fake injection client
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"
)

const (
Expand Down Expand Up @@ -263,7 +269,8 @@ func TestHandler_ServeHTTP(t *testing.T) {

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
ctx, _ := reconcilertesting.SetupFakeContext(t, SetUpInformerSelector)
trustBundleConfigMapLister := filteredconfigmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

s := httptest.NewServer(tc.handler)
defer s.Close()
Expand Down Expand Up @@ -292,7 +299,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}

tokenProvider := auth.NewOIDCTokenProvider(ctx)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), feature.FromContextOrDefaults(ctx))
authVerifier := auth.NewVerifier(ctx, eventpolicyinformerfake.Get(ctx).Lister(), trustBundleConfigMapLister, feature.FromContextOrDefaults(ctx))

h, err := NewHandler(logger,
&mockReporter{},
Expand Down Expand Up @@ -402,3 +409,8 @@ func withUninitializedAnnotations(b *eventingv1.Broker) *eventingv1.Broker {
b.Status.Annotations = nil
return b
}

func SetUpInformerSelector(ctx context.Context) context.Context {
ctx = filteredFactory.WithSelectors(ctx, eventingtls.TrustBundleLabelSelector)
return ctx
}
8 changes: 4 additions & 4 deletions pkg/reconciler/inmemorychannel/dispatcher/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewController(
) *controller.Impl {
logger := logging.FromContext(ctx)

trustBundleConfigMapInformer := filteredconfigmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector)
trustBundleConfigMapLister := filteredconfigmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

// Setup trace publishing.
iw := cmw.(*configmapinformer.InformedWatcher)
Expand Down Expand Up @@ -127,7 +127,7 @@ func NewController(
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)

clientConfig := eventingtls.ClientConfig{
TrustBundleConfigMapLister: trustBundleConfigMapInformer.Lister().ConfigMaps(system.Namespace()),
TrustBundleConfigMapLister: trustBundleConfigMapLister,
}

var globalResync func(obj interface{})
Expand All @@ -146,7 +146,7 @@ func NewController(
eventingClient: eventingclient.Get(ctx).EventingV1beta2(),
eventTypeLister: eventtypeinformer.Get(ctx).Lister(),
eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider),
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load()),
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load()),
clientConfig: clientConfig,
inMemoryChannelLister: inmemorychannelInformer.Lister(),
}
Expand All @@ -156,7 +156,7 @@ func NewController(
})

globalResync = func(_ interface{}) {
r.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), featureStore.Load())
r.authVerifier = auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, featureStore.Load())
impl.GlobalResync(inmemorychannelInformer.Informer())
}

Expand Down

0 comments on commit eee1320

Please sign in to comment.