Skip to content

Commit 4d7f6d5

Browse files
Pluggable BBR framework (WiP)
1 parent 68a5612 commit 4d7f6d5

File tree

10 files changed

+689
-4
lines changed

10 files changed

+689
-4
lines changed

config/charts/body-based-routing/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,7 @@ provider:
1818

1919
inferenceGateway:
2020
name: inference-gateway
21+
22+
env:
23+
- name: REQUEST_PLUGINS_CHAIN
24+
value: "simple-model-extractor"

pkg/bbr/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Body-Based Routing
2-
This package provides an extension that can be deployed to write the `model`
3-
HTTP body parameter as a header (X-Gateway-Model-Name) so as to enable routing capabilities on the
2+
By deafult this package provides a plugable extension that can be to set the `model`
3+
HTTP body parameter as a header (`X-Gateway-Model-Name`) so as to enable routing capabilities on the
44
model name.
55

66
As per OpenAI spec, it is standard for the model name to be included in the

pkg/bbr/framework/interfaces.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package framework
18+
19+
import (
20+
bbrplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
21+
)
22+
23+
const (
24+
RequestPluginChain = "REQUEST_PLUGINS_CHAIN"
25+
ResponsePluginChain = "RESPONSE_PLUGINS_CHAIN"
26+
MaxPluginChainLength = 10 //arbitrary number that can be changed **later**; it's here to short-circuit long inputs
27+
MaxNumberOfPlugins = 100 //arbitrary number that can be changed **later**; can be used to optimize memory allocation for the registry **later**
28+
)
29+
30+
// placeholder for Plugin constructors
31+
type PluginFactoryFunc func() bbrplugins.BBRPlugin //any no-argument function that returns bbrplugins.BBRPlugin can be assigned to this type (including a constructor function)
32+
33+
// ----------------------- Registry Interface --------------------------------------------------
34+
// PluginRegistry defines operations for managing plugin factories and plugin instances
35+
type PluginRegistry interface {
36+
RegisterFactory(typeKey string, factory PluginFactoryFunc) error //constructors
37+
RegisterPlugin(plugin bbrplugins.BBRPlugin) error //registers a plugin instance (the instance is supposed to be created via the factory first)
38+
GetFactory(typeKey string) (PluginFactoryFunc, error)
39+
GetPlugin(typeKey string) (bbrplugins.BBRPlugin, error)
40+
GetFactories() map[string]PluginFactoryFunc
41+
GetPlugins() map[string]bbrplugins.BBRPlugin
42+
ListPlugins() []string
43+
ListFactories() []string
44+
CreatePlugin(typeKey string) (bbrplugins.BBRPlugin, error)
45+
ContainsFactory(typeKey string) bool
46+
ContainsPlugin(typeKey string) bool
47+
String() string
48+
}
49+
50+
// ------------------------ Ordered Plugins Interface ------------------------------------------
51+
// PluginsChain is used to define a specific order of execution of the plugin instances stored in the registry
52+
type PluginsChain interface {
53+
AddPlugin(typeKey string, registry PluginRegistry) error //to be added to the chain the plugin should be registered in the registry first
54+
AddPluginAtInd(typeKey string, i int, r PluginRegistry) error //only affects the instance of the plugin chain
55+
GetPlugin(index int, registry PluginRegistry) (bbrplugins.BBRPlugin, error) //retrieves i-th plugin as defined in the chain from the registry
56+
Length() int
57+
GetPlugins() []string
58+
Run(bodyBytes []byte, metaDataKeys []string, registry PluginRegistry) (map[string]string, []byte, error) //return potentially mutated body and all headers map safely merged
59+
String() string
60+
}

pkg/bbr/framework/registry.go

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package framework
18+
19+
import (
20+
"fmt"
21+
22+
bbrplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
23+
)
24+
25+
// -------------------- INTERFACES -----------------------------------------------------------------------
26+
// Interfaces are defined in "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/framework/interfaces.go"
27+
28+
// --------------------- PluginRegistry implementation ---------------------------------------------------
29+
30+
// pluginRegistry implements PluginRegistry
31+
type pluginRegistry struct {
32+
pluginsFactory map[string]PluginFactoryFunc //constructors
33+
plugins map[string]bbrplugins.BBRPlugin // instances
34+
}
35+
36+
// NewPluginRegistry creates a new instance of pluginRegistry
37+
func NewPluginRegistry() PluginRegistry {
38+
return &pluginRegistry{
39+
pluginsFactory: make(map[string]PluginFactoryFunc),
40+
plugins: make(map[string]bbrplugins.BBRPlugin),
41+
}
42+
}
43+
44+
// Register a plugin factory by type key (e.g., "ModelSelector", "MetadataExtractor")
45+
func (r *pluginRegistry) RegisterFactory(typeKey string, factory PluginFactoryFunc) error {
46+
//validate whether already registered
47+
alreadyRegistered := r.ContainsFactory(typeKey)
48+
if alreadyRegistered {
49+
err := fmt.Errorf("factory fot plugin interface type %s is already registered", typeKey)
50+
return err
51+
}
52+
r.pluginsFactory[typeKey] = factory
53+
54+
return nil
55+
}
56+
57+
// Register a plugin instance (created through the appropriate factory)
58+
func (r *pluginRegistry) RegisterPlugin(plugin bbrplugins.BBRPlugin) error {
59+
//validate whether this interface is supported
60+
alreadyRegistered := r.ContainsPlugin(plugin.TypedName().Type)
61+
62+
if alreadyRegistered {
63+
err := fmt.Errorf("plugin implementing interface type %s is already registered", plugin.TypedName().Type)
64+
return err
65+
}
66+
67+
// validate that the factory for this plugin is registered: always register factory before the plugin
68+
if _, ok := r.pluginsFactory[plugin.TypedName().Type]; !ok {
69+
err := fmt.Errorf("no plugin factory registered for plugin interface type %s", plugin.TypedName().Type)
70+
return err
71+
}
72+
r.plugins[plugin.TypedName().Type] = plugin
73+
74+
return nil
75+
}
76+
77+
// Retrieves a plugin factory by type key
78+
func (r *pluginRegistry) GetFactory(typeKey string) (PluginFactoryFunc, error) {
79+
if pluginFactory, ok := r.pluginsFactory[typeKey]; ok {
80+
return pluginFactory, nil
81+
}
82+
return nil, fmt.Errorf("plugin type %s not found", typeKey)
83+
}
84+
85+
// Retrieves a plugin instance by type key
86+
func (r *pluginRegistry) GetPlugin(typeKey string) (bbrplugins.BBRPlugin, error) {
87+
if plugin, ok := r.plugins[typeKey]; ok {
88+
return plugin, nil
89+
}
90+
return nil, fmt.Errorf("plugin type %s not found", typeKey)
91+
}
92+
93+
// Constructs a new plugin (a caller can perform either type assertion of a concrete implementation of the BBR plugin)
94+
func (r *pluginRegistry) CreatePlugin(typeKey string) (bbrplugins.BBRPlugin, error) {
95+
if factory, ok := r.pluginsFactory[typeKey]; ok {
96+
plugin := factory()
97+
return plugin, nil
98+
}
99+
return nil, fmt.Errorf("plugin %s not registered", typeKey)
100+
}
101+
102+
// Removes a plugin factory by type key
103+
func (r *pluginRegistry) UnregisterFactory(typeKey string) error {
104+
if _, ok := r.pluginsFactory[typeKey]; ok {
105+
delete(r.pluginsFactory, typeKey)
106+
return nil
107+
}
108+
return fmt.Errorf("plugin (%s) not found", typeKey)
109+
}
110+
111+
// ListPlugins lists all registered plugins
112+
func (r *pluginRegistry) ListPlugins() []string {
113+
typeKeys := make([]string, 0, len(r.plugins))
114+
for k := range r.plugins {
115+
typeKeys = append(typeKeys, k)
116+
}
117+
return typeKeys
118+
}
119+
120+
// ListPlugins lists all registered plugins; this functionis not really needed. Just for sanity checks and tests
121+
func (r *pluginRegistry) ListFactories() []string {
122+
typeKeys := make([]string, 0, len(r.pluginsFactory))
123+
for k := range r.pluginsFactory {
124+
typeKeys = append(typeKeys, k)
125+
}
126+
return typeKeys
127+
}
128+
129+
// Get factories
130+
func (r *pluginRegistry) GetFactories() map[string]PluginFactoryFunc {
131+
return r.pluginsFactory
132+
}
133+
134+
// Get plugins
135+
func (r *pluginRegistry) GetPlugins() map[string]bbrplugins.BBRPlugin {
136+
return r.plugins
137+
}
138+
139+
// Checks for presense of a factory in this registry
140+
func (r *pluginRegistry) ContainsFactory(typeKey string) bool {
141+
_, exists := r.pluginsFactory[typeKey]
142+
return exists
143+
}
144+
145+
// Helper: Checks for presense of a plugin in this registry
146+
func (r *pluginRegistry) ContainsPlugin(typeKey string) bool {
147+
_, exists := r.plugins[typeKey]
148+
return exists
149+
}
150+
151+
func (r *pluginRegistry) String() string {
152+
return fmt.Sprintf("{plugins=%v}{pluginsFactory=%v}", r.plugins, r.pluginsFactory)
153+
}
154+
155+
//-------------------------- PluginsChain implementation --------------------------
156+
157+
// PluginsChain is a sequence of plugins to be executed in order inside the ext_proc server
158+
type pluginsChain struct {
159+
plugins []string
160+
}
161+
162+
// NewPluginsChain creates a new PluginsChain instance
163+
func NewPluginsChain() PluginsChain {
164+
return &pluginsChain{
165+
plugins: []string{},
166+
}
167+
}
168+
169+
// AddPlugin adds a plugin to the chain
170+
func (pc *pluginsChain) AddPlugin(typeKey string, r PluginRegistry) error {
171+
// check whether this plugin was registered in the registry (i.e., the factory for the plugin exist and an instance was created)
172+
if ok := r.ContainsPlugin(typeKey); !ok {
173+
err := fmt.Errorf("plugin type %s not found", typeKey)
174+
return err
175+
}
176+
pc.plugins = append(pc.plugins, typeKey)
177+
178+
return nil
179+
}
180+
181+
// GetPlugin retrieves the next plugin in the chain by index
182+
func (pc *pluginsChain) GetPlugin(index int, r PluginRegistry) (bbrplugins.BBRPlugin, error) {
183+
if index < 0 || index >= len(pc.plugins) {
184+
return nil, fmt.Errorf("plugin index %d out of range", index)
185+
}
186+
plugins := r.GetPlugins()
187+
plugin, ok := plugins[pc.plugins[index]]
188+
if !ok {
189+
return nil, fmt.Errorf("plugin index %d is not found in the registry", index)
190+
}
191+
return plugin, nil
192+
}
193+
194+
// Length returns the number of plugins in the chain
195+
func (pc *pluginsChain) Length() int {
196+
return len(pc.plugins)
197+
}
198+
199+
// AddPluginInOrder inserts a plugin into the chain in the specified index
200+
func (pc *pluginsChain) AddPluginAtInd(typeKey string, i int, r PluginRegistry) error {
201+
if i < 0 || i > len(pc.plugins) {
202+
return fmt.Errorf("index %d is out of range", i)
203+
}
204+
// validate that the plugin is registered
205+
plugins := r.GetPlugins()
206+
if _, ok := plugins[pc.plugins[i]]; !ok {
207+
return fmt.Errorf("plugin index %d is not found in the registry", i)
208+
}
209+
pc.plugins = append(pc.plugins[:i], append([]string{typeKey}, pc.plugins[i:]...)...)
210+
return nil
211+
}
212+
213+
func (pc *pluginsChain) GetPlugins() []string {
214+
return pc.plugins
215+
}
216+
217+
// MergeMaps copies all key/value pairs from src into dst and returns dst.
218+
// If dst is nil a new map is allocated.
219+
// Existing keys in dst are not overwritten.
220+
// This is a helper function used to merge headers from multiple plugins safely.
221+
func MergeMaps(dst map[string]string, src map[string]string) map[string]string {
222+
if src == nil {
223+
if dst == nil {
224+
return map[string]string{}
225+
}
226+
return dst
227+
}
228+
if dst == nil {
229+
dst = make(map[string]string, len(src))
230+
}
231+
232+
for k, v := range src {
233+
if _, exists := dst[k]; !exists {
234+
dst[k] = v
235+
}
236+
}
237+
238+
return dst
239+
}
240+
241+
func (pc *pluginsChain) Run(
242+
bodyBytes []byte,
243+
metaDataKeys []string,
244+
r PluginRegistry,
245+
) (headers map[string]string, mutateBodyBytes []byte, err error) {
246+
247+
allHeaders := make(map[string]string)
248+
mutatedBodyBytes := bodyBytes
249+
250+
for i := range pc.Length() {
251+
plugin, _ := pc.GetPlugin(i, r)
252+
pluginType := plugin.TypedName().Type
253+
254+
metExtPlugin, err := r.GetPlugin(pluginType)
255+
256+
if err != nil {
257+
return allHeaders, bodyBytes, err
258+
}
259+
260+
// The plugin i in the chain receives the (potentially mutated) body from plugin i-1 in the chain
261+
headers, mutatedBodyBytes, err := metExtPlugin.Execute(mutatedBodyBytes, metaDataKeys)
262+
263+
if err != nil {
264+
return headers, mutatedBodyBytes, err
265+
}
266+
267+
//note that the existing overlapping keys are NOT over-written by merge
268+
MergeMaps(allHeaders, headers)
269+
}
270+
return allHeaders, mutatedBodyBytes, nil
271+
}
272+
273+
func (pc *pluginsChain) String() string {
274+
return fmt.Sprintf("PluginsChain{plugins=%v}", pc.plugins)
275+
}
276+
277+
// -------------------------- End of PluginsChain implementation --------------------------

pkg/bbr/handlers/request_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,19 @@ import (
3030
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
3131

3232
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics"
33+
utils "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/utils"
3334
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3435
)
3536

3637
func TestHandleRequestBody(t *testing.T) {
3738
metrics.Register()
3839
ctx := logutil.NewTestLoggerIntoContext(context.Background())
3940

41+
//set environment variables expected by the code under test
42+
//testing a minimal configuration
43+
//request plugin chain always contains the default bbr plugin that extracts a model name and sets it on the X-Gateway-Model-Name header
44+
t.Setenv("REQUEST_PLUGINS_CHAIN", "simple_model_extractor")
45+
4046
tests := []struct {
4147
name string
4248
body map[string]any
@@ -176,9 +182,15 @@ func TestHandleRequestBody(t *testing.T) {
176182
},
177183
}
178184

185+
//Initialize PluginRegistry and request/response PluginsChain instances based on the minimal configuration setting vi env vars
186+
registry, requestChain, responseChain, metaDataKeys, err := utils.InitPlugins()
187+
if err != nil {
188+
t.Fatalf("processRequestBody(): %v", err)
189+
}
190+
179191
for _, test := range tests {
180192
t.Run(test.name, func(t *testing.T) {
181-
server := &Server{streaming: test.streaming}
193+
server := NewServer(test.streaming, registry, requestChain, responseChain, metaDataKeys)
182194
bodyBytes, _ := json.Marshal(test.body)
183195
resp, err := server.HandleRequestBody(ctx, bodyBytes)
184196
if err != nil {

0 commit comments

Comments
 (0)