Skip to content

Commit d499be4

Browse files
[release-1.18] Fix mt-broker-ingress auth to work with structured event too (#8714)
* Fix mt-broker-ingress auth to work with structured event too * Fix channel receiver auth to work with structured event too * Add unit test --------- Co-authored-by: Christoph Stäbler <[email protected]>
1 parent d952cdf commit d499be4

File tree

6 files changed

+115
-46
lines changed

6 files changed

+115
-46
lines changed

pkg/auth/verifier.go

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package auth
1818

1919
import (
20-
"bytes"
2120
"context"
2221
"encoding/json"
2322
"fmt"
@@ -31,6 +30,7 @@ import (
3130
"go.opencensus.io/plugin/ochttp"
3231
corev1listers "k8s.io/client-go/listers/core/v1"
3332
"knative.dev/eventing/pkg/eventingtls"
33+
"knative.dev/eventing/pkg/utils"
3434
"knative.dev/pkg/configmap"
3535
"knative.dev/pkg/network"
3636
"knative.dev/pkg/tracing/propagation/tracecontextb3"
@@ -160,7 +160,7 @@ func (v *Verifier) verifyAuthN(ctx context.Context, audience *string, req *http.
160160
// verifyAuthZ verifies if the given idToken is allowed by the resources eventPolicyStatus
161161
func (v *Verifier) verifyAuthZ(ctx context.Context, features feature.Flags, idToken *IDToken, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef, req *http.Request, resp http.ResponseWriter) error {
162162
if len(policyRefs) > 0 {
163-
req, err := copyRequest(req)
163+
req, err := utils.CopyRequest(req)
164164
if err != nil {
165165
resp.WriteHeader(http.StatusInternalServerError)
166166
return fmt.Errorf("failed to copy request body: %w", err)
@@ -332,35 +332,6 @@ func (v *Verifier) getKubernetesOIDCDiscovery(features feature.Flags, client *ht
332332
return openIdConfig, nil
333333
}
334334

335-
// copyRequest makes a copy of the http request which can be consumed as needed, leaving the original request
336-
// able to be consumed as well.
337-
func copyRequest(req *http.Request) (*http.Request, error) {
338-
// check if we actually need to copy the body, otherwise we can return the original request
339-
if req.Body == nil || req.Body == http.NoBody {
340-
return req, nil
341-
}
342-
343-
var buf bytes.Buffer
344-
if _, err := buf.ReadFrom(req.Body); err != nil {
345-
return nil, fmt.Errorf("failed to read request body while copying it: %w", err)
346-
}
347-
348-
if err := req.Body.Close(); err != nil {
349-
return nil, fmt.Errorf("failed to close original request body ready while copying request: %w", err)
350-
}
351-
352-
// set the original request body to be readable again
353-
req.Body = io.NopCloser(&buf)
354-
355-
// return a new request with a readable body and same headers as the original
356-
// we don't need to set any other fields as cloudevents only uses the headers
357-
// and body to construct the Message/Event.
358-
return &http.Request{
359-
Header: req.Header,
360-
Body: io.NopCloser(bytes.NewReader(buf.Bytes())),
361-
}, nil
362-
}
363-
364335
type openIDMetadata struct {
365336
Issuer string `json:"issuer"`
366337
JWKSURI string `json:"jwks_uri"`

pkg/broker/ingress/ingress_handler.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,14 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
199199
return
200200
}
201201

202+
// copy the request, as we need access to the body (in case of a structured event) for the auth checks too
203+
reqCp, err := utils.CopyRequest(request)
204+
if err != nil {
205+
h.Logger.Error("Failed to copy request", zap.Error(err))
206+
writer.WriteHeader(http.StatusInternalServerError)
207+
return
208+
}
209+
202210
ctx := h.withContext(request.Context())
203211

204212
message := cehttp.NewMessageFromHttpRequest(request)
@@ -243,7 +251,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
243251
if broker.Status.Address != nil {
244252
audience = broker.Status.Address.Audience
245253
}
246-
err = h.tokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, request, writer)
254+
err = h.tokenVerifier.VerifyRequest(ctx, features, audience, brokerNamespace, broker.Status.Policies, reqCp, writer)
247255
if err != nil {
248256
h.Logger.Warn("Failed to verify AuthN and AuthZ.", zap.Error(err))
249257
return

pkg/channel/event_receiver.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,14 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
250250
args.EventScheme = "http"
251251
}
252252

253+
// copy the request, as we need access to the body (in case of a structured event) for the auth checks too
254+
reqCopy, err := utils.CopyRequest(request)
255+
if err != nil {
256+
r.logger.Error("Failed to copy request", zap.Error(err))
257+
response.WriteHeader(nethttp.StatusInternalServerError)
258+
return
259+
}
260+
253261
event, err := http.NewEventFromHTTPRequest(request)
254262
if err != nil {
255263
r.logger.Warn("failed to extract event from request", zap.Error(err))
@@ -283,7 +291,7 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
283291
return
284292
}
285293

286-
err = r.tokenVerifier.VerifyRequest(ctx, features, &r.audience, channel.Namespace, applyingEventPolicies, request, response)
294+
err = r.tokenVerifier.VerifyRequest(ctx, features, &r.audience, channel.Namespace, applyingEventPolicies, reqCopy, response)
287295
if err != nil {
288296
r.logger.Warn("could not verify authn and authz of request", zap.Error(err))
289297
return

pkg/utils/utils.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ limitations under the License.
1717
package utils
1818

1919
import (
20+
"bytes"
21+
"fmt"
22+
"io"
23+
"net/http"
2024
"regexp"
2125
"strings"
2226

@@ -91,3 +95,32 @@ func GenerateFixedName(owner metav1.Object, prefix string) string {
9195
// A dot must be followed by [a-z0-9] to be DNS1123 compliant. Make sure we are not joining a dot and a dash.
9296
return strings.TrimSuffix(prefix, ".") + uid
9397
}
98+
99+
// CopyRequest makes a copy of the http request which can be consumed as needed, leaving the original request
100+
// able to be consumed as well.
101+
func CopyRequest(req *http.Request) (*http.Request, error) {
102+
// check if we actually need to copy the body, otherwise we can return the original request
103+
if req.Body == nil || req.Body == http.NoBody {
104+
return req, nil
105+
}
106+
107+
var buf bytes.Buffer
108+
if _, err := buf.ReadFrom(req.Body); err != nil {
109+
return nil, fmt.Errorf("failed to read request body while copying it: %w", err)
110+
}
111+
112+
if err := req.Body.Close(); err != nil {
113+
return nil, fmt.Errorf("failed to close original request body ready while copying request: %w", err)
114+
}
115+
116+
// set the original request body to be readable again
117+
req.Body = io.NopCloser(&buf)
118+
119+
// return a new request with a readable body and same headers as the original
120+
// we don't need to set any other fields as cloudevents only uses the headers
121+
// and body to construct the Message/Event.
122+
return &http.Request{
123+
Header: req.Header,
124+
Body: io.NopCloser(bytes.NewReader(buf.Bytes())),
125+
}, nil
126+
}

pkg/utils/utils_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ limitations under the License.
1717
package utils
1818

1919
import (
20+
"bytes"
2021
"fmt"
22+
"io"
23+
"net/http"
2124
"strings"
2225
"testing"
2326

@@ -169,3 +172,45 @@ func TestToDNS1123Subdomain(t *testing.T) {
169172
})
170173
}
171174
}
175+
176+
func TestCopyRequest(t *testing.T) {
177+
const (
178+
contentType = "application/json"
179+
authorization = "Bearer token"
180+
)
181+
182+
originalRequest := &http.Request{
183+
Header: map[string][]string{
184+
"Content-Type": {contentType},
185+
"Authorization": {authorization},
186+
},
187+
Body: io.NopCloser(strings.NewReader("test content")),
188+
}
189+
190+
copiedReq, err := CopyRequest(originalRequest)
191+
192+
if err != nil {
193+
t.Errorf("Unexpected error: %v", err)
194+
}
195+
196+
if copiedReq.Header.Get("Content-Type") != contentType {
197+
t.Error("Header not copied correctly")
198+
}
199+
if copiedReq.Header.Get("Authorization") != authorization {
200+
t.Error("Authorization header not copied correctly")
201+
}
202+
203+
originalBody, err := io.ReadAll(originalRequest.Body)
204+
if err != nil {
205+
t.Fatalf("Failed to read original body: %v", err)
206+
}
207+
208+
copiedBody, err := io.ReadAll(copiedReq.Body)
209+
if err != nil {
210+
t.Fatalf("Failed to read copied body: %v", err)
211+
}
212+
213+
if !bytes.Equal(originalBody, copiedBody) {
214+
t.Errorf("Body not copied correctly. Original: %s, Copied: %s", originalBody, copiedBody)
215+
}
216+
}

test/rekt/features/authz/addressable_authz_conformance.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"time"
2323

24+
cloudevents "github.com/cloudevents/sdk-go/v2"
2425
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
2526
"knative.dev/eventing/test/rekt/resources/eventpolicy"
2627
"knative.dev/eventing/test/rekt/resources/pingsource"
@@ -41,7 +42,8 @@ func AddressableAuthZConformance(gvr schema.GroupVersionResource, kind, name str
4142
fs := feature.FeatureSet{
4243
Name: fmt.Sprintf("%s handles authorization features correctly", kind),
4344
Features: []*feature.Feature{
44-
addressableRespectsEventPolicyFilters(gvr, kind, name),
45+
addressableRespectsEventPolicyFilters(gvr, kind, name, cloudevents.EncodingBinary),
46+
addressableRespectsEventPolicyFilters(gvr, kind, name, cloudevents.EncodingStructured),
4547
},
4648
}
4749

@@ -57,16 +59,18 @@ func AddressableAuthZConformanceRequestHandling(gvr schema.GroupVersionResource,
5759
fs := feature.FeatureSet{
5860
Name: fmt.Sprintf("%s handles authorization in requests correctly", kind),
5961
Features: []*feature.Feature{
60-
addressableAllowsAuthorizedRequest(gvr, kind, name),
61-
addressableRejectsUnauthorizedRequest(gvr, kind, name),
62+
addressableAllowsAuthorizedRequest(gvr, kind, name, cloudevents.EncodingBinary),
63+
addressableAllowsAuthorizedRequest(gvr, kind, name, cloudevents.EncodingStructured),
64+
addressableRejectsUnauthorizedRequest(gvr, kind, name, cloudevents.EncodingBinary),
65+
addressableRejectsUnauthorizedRequest(gvr, kind, name, cloudevents.EncodingStructured),
6266
addressableBecomesUnreadyOnUnreadyEventPolicy(gvr, kind, name),
6367
},
6468
}
6569
return &fs
6670
}
6771

68-
func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, name string) *feature.Feature {
69-
f := feature.NewFeatureNamed(fmt.Sprintf("%s accepts authorized request", kind))
72+
func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, name string, inputEventEncoding cloudevents.Encoding) *feature.Feature {
73+
f := feature.NewFeatureNamed(fmt.Sprintf("%s accepts authorized request with %s encoding for input event", kind, inputEventEncoding))
7074

7175
f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled())
7276
f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
@@ -95,7 +99,7 @@ func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, n
9599
f.Requirement("install source", eventshub.Install(
96100
source,
97101
eventshub.StartSenderToResourceTLS(gvr, name, nil),
98-
eventshub.InputEvent(event),
102+
eventshub.InputEventWithEncoding(event, inputEventEncoding),
99103
eventshub.OIDCSubject(sourceSubject),
100104
))
101105

@@ -106,8 +110,8 @@ func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, n
106110
return f
107111
}
108112

109-
func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind, name string) *feature.Feature {
110-
f := feature.NewFeatureNamed(fmt.Sprintf("%s rejects unauthorized request", kind))
113+
func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind, name string, inputEventEncoding cloudevents.Encoding) *feature.Feature {
114+
f := feature.NewFeatureNamed(fmt.Sprintf("%s rejects unauthorized request with %s encoding for input event", kind, inputEventEncoding))
111115

112116
f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled())
113117
f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
@@ -132,7 +136,7 @@ func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind
132136
f.Requirement("install source", eventshub.Install(
133137
source,
134138
eventshub.StartSenderToResourceTLS(gvr, name, nil),
135-
eventshub.InputEvent(event),
139+
eventshub.InputEventWithEncoding(event, inputEventEncoding),
136140
eventshub.InitialSenderDelay(10*time.Second),
137141
))
138142

@@ -143,8 +147,8 @@ func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind
143147
return f
144148
}
145149

146-
func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind, name string) *feature.Feature {
147-
f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter", kind))
150+
func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind, name string, inputEventEncoding cloudevents.Encoding) *feature.Feature {
151+
f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter with %s encoding for input event", kind, inputEventEncoding))
148152

149153
f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled())
150154
f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
@@ -188,14 +192,14 @@ func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind
188192
f.Requirement("install source 1", eventshub.Install(
189193
source1,
190194
eventshub.StartSenderToResourceTLS(gvr, name, nil),
191-
eventshub.InputEvent(event1),
195+
eventshub.InputEventWithEncoding(event1, inputEventEncoding),
192196
eventshub.OIDCSubject(sourceSubject1),
193197
))
194198

195199
f.Requirement("install source 2", eventshub.Install(
196200
source2,
197201
eventshub.StartSenderToResourceTLS(gvr, name, nil),
198-
eventshub.InputEvent(event2),
202+
eventshub.InputEventWithEncoding(event2, inputEventEncoding),
199203
eventshub.OIDCSubject(sourceSubject2),
200204
))
201205

0 commit comments

Comments
 (0)