@@ -18,54 +18,49 @@ package handlers
1818
1919import (
2020 "context"
21- "encoding/json "
21+ "strings "
2222
2323 basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2424 eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2525 "sigs.k8s.io/controller-runtime/pkg/log"
2626
2727 "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics"
28+ bbrplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
29+ helpers "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/utils"
2830 logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2931)
3032
31- const modelHeader = "X-Gateway-Model-Name"
32-
33- type RequestBody struct {
34- Model string `json:"model"`
35- }
36-
3733// HandleRequestBody handles request bodies.
3834func (s * Server ) HandleRequestBody (ctx context.Context , requestBodyBytes []byte ) ([]* eppb.ProcessingResponse , error ) {
3935 logger := log .FromContext (ctx )
4036 var ret []* eppb.ProcessingResponse
4137
42- var requestBody RequestBody
43- if err := json .Unmarshal (requestBodyBytes , & requestBody ); err != nil {
44- metrics .RecordModelNotParsedCounter ()
45- return nil , err
38+ allHeaders , mutatedBodyBytes , err := s .requestChain .Run (requestBodyBytes , s .metaDataKeys , s .registry )
39+
40+ if err != nil {
41+ //TODO: add metric in metrics.go to count "other errors"
42+ logger .V (logutil .DEFAULT ).Info ("error processing body" , "error" , err )
43+ ret , _ := buildEmptyResponsesForMissingModel (s .streaming , requestBodyBytes )
44+ return ret , nil
45+ }
46+
47+ model , ok := allHeaders [bbrplugins .ModelHeader ]
48+
49+ if ! ok {
50+ //TODO: add metric in metrics.go to count "other errors"
51+ logger .V (logutil .DEFAULT ).Info ("manadatory header X-Gateway-Model-Name value is undetermined" )
52+ ret , _ := buildEmptyResponsesForMissingModel (s .streaming , requestBodyBytes )
53+ return ret , nil
4654 }
4755
48- if requestBody . Model == "" {
56+ if strings . TrimSpace ( model ) == "" {
4957 metrics .RecordModelNotInBodyCounter ()
50- logger .V (logutil .DEFAULT ).Info ("Request body does not contain model parameter" )
51- if s .streaming {
52- ret = append (ret , & eppb.ProcessingResponse {
53- Response : & eppb.ProcessingResponse_RequestHeaders {
54- RequestHeaders : & eppb.HeadersResponse {},
55- },
56- })
57- ret = addStreamedBodyResponse (ret , requestBodyBytes )
58- return ret , nil
59- } else {
60- ret = append (ret , & eppb.ProcessingResponse {
61- Response : & eppb.ProcessingResponse_RequestBody {
62- RequestBody : & eppb.BodyResponse {},
63- },
64- })
65- }
58+ ret , _ := buildEmptyResponsesForMissingModel (s .streaming , requestBodyBytes )
6659 return ret , nil
6760 }
6861
62+ logger .V (logutil .DEFAULT ).Info ("model extracted from request body" , "model" , model )
63+
6964 metrics .RecordSuccessCounter ()
7065
7166 if s .streaming {
@@ -78,8 +73,8 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestBodyBytes []byte)
7873 SetHeaders : []* basepb.HeaderValueOption {
7974 {
8075 Header : & basepb.HeaderValue {
81- Key : modelHeader ,
82- RawValue : []byte (requestBody . Model ),
76+ Key : bbrplugins . ModelHeader ,
77+ RawValue : []byte (model ),
8378 },
8479 },
8580 },
@@ -88,7 +83,10 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestBodyBytes []byte)
8883 },
8984 },
9085 })
91- ret = addStreamedBodyResponse (ret , requestBodyBytes )
86+ ret = addStreamedBodyResponse (ret , mutatedBodyBytes )
87+
88+ logger .V (logutil .DEFAULT ).Info ("RESPONSE" , "response" , helpers .PrettyPrintResponses (ret ))
89+
9290 return ret , nil
9391 }
9492
@@ -103,28 +101,33 @@ func (s *Server) HandleRequestBody(ctx context.Context, requestBodyBytes []byte)
103101 SetHeaders : []* basepb.HeaderValueOption {
104102 {
105103 Header : & basepb.HeaderValue {
106- Key : modelHeader ,
107- RawValue : []byte (requestBody . Model ),
104+ Key : bbrplugins . ModelHeader ,
105+ RawValue : []byte (model ),
108106 },
109107 },
110108 },
111109 },
110+ BodyMutation : & eppb.BodyMutation {
111+ Mutation : & eppb.BodyMutation_Body {
112+ Body : mutatedBodyBytes ,
113+ },
114+ },
112115 },
113116 },
114117 },
115118 },
116119 }, nil
117120}
118121
119- func addStreamedBodyResponse (responses []* eppb.ProcessingResponse , requestBodyBytes []byte ) []* eppb.ProcessingResponse {
122+ func addStreamedBodyResponse (responses []* eppb.ProcessingResponse , mutatedBodyBytes []byte ) []* eppb.ProcessingResponse {
120123 return append (responses , & eppb.ProcessingResponse {
121124 Response : & eppb.ProcessingResponse_RequestBody {
122125 RequestBody : & eppb.BodyResponse {
123126 Response : & eppb.CommonResponse {
124127 BodyMutation : & eppb.BodyMutation {
125128 Mutation : & eppb.BodyMutation_StreamedResponse {
126129 StreamedResponse : & eppb.StreamedBodyResponse {
127- Body : requestBodyBytes ,
130+ Body : mutatedBodyBytes ,
128131 EndOfStream : true ,
129132 },
130133 },
@@ -156,3 +159,31 @@ func (s *Server) HandleRequestTrailers(trailers *eppb.HttpTrailers) ([]*eppb.Pro
156159 },
157160 }, nil
158161}
162+
163+ // buildEmptyResponsesForMissingModel is a local helper that returns the appropriate empty responses
164+ // for the "model not found" branch depending on streaming mode.
165+ // It is also used to create empty responses in case of other errors related to running plugins on the body
166+ // This is not very clean and MUST be segregated in the future.
167+ // Corresponding metrics should be defined to make different errors observable
168+ func buildEmptyResponsesForMissingModel (streaming bool , requestBodyBytes []byte ) ([]* eppb.ProcessingResponse , error ) {
169+ var ret []* eppb.ProcessingResponse
170+
171+ if streaming {
172+ // Emit empty headers response, then stream body unchanged.
173+ ret = append (ret , & eppb.ProcessingResponse {
174+ Response : & eppb.ProcessingResponse_RequestHeaders {
175+ RequestHeaders : & eppb.HeadersResponse {},
176+ },
177+ })
178+ ret = addStreamedBodyResponse (ret , requestBodyBytes )
179+ return ret , nil
180+ }
181+
182+ // Non-streaming: emit empty body response.
183+ ret = append (ret , & eppb.ProcessingResponse {
184+ Response : & eppb.ProcessingResponse_RequestBody {
185+ RequestBody : & eppb.BodyResponse {},
186+ },
187+ })
188+ return ret , nil
189+ }
0 commit comments