Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName {

func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *configapi.EndpointPickerConfig, ds datastore.Datastore) (*config.Config, error) {
logger := log.FromContext(ctx)
handle := plugins.NewEppHandle(ctx, makePodListFunc(ds))
handle := plugins.NewEppHandle(ctx, rawConfig.Plugins, nil, makePodListFunc(ds))
cfg, err := loader.InstantiateAndConfigure(rawConfig, handle, logger)

if err != nil {
Expand All @@ -513,7 +513,6 @@ func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *conf
return nil, errors.New("failed to load the configuration - prepare data plugins have cyclic dependencies")
}

// Handler deprecated configuration options
r.deprecatedConfigurationHelper(cfg, logger)

logger.Info("loaded configuration from file/text successfully")
Expand Down
10 changes: 8 additions & 2 deletions pkg/epp/config/loader/configloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,18 @@ func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins
}
pluginNames.Insert(spec.Name)

factory, ok := plugins.Registry[spec.Type]
reg, ok := plugins.Registry[spec.Type]
if !ok {
return fmt.Errorf("plugin type '%s' is not registered", spec.Type)
}

plugin, err := factory(spec.Name, spec.Parameters, handle)
// If this is a Transient plugin, we DO NOT instantiate it here.
// It sits in the Handle.PluginSpecs() waiting for the Factory to create instances on demand.
if reg.Lifecycle == plugins.LifecycleTransient {
continue
}

plugin, err := reg.Factory(spec.Name, spec.Parameters, handle)
if err != nil {
return fmt.Errorf("failed to create plugin '%s' (type: %s): %w", spec.Name, spec.Type, err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/config/loader/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ func registerDefaultPlugin(
pluginType string,
) error {
name := pluginType
factory, ok := plugins.Registry[pluginType]
reg, ok := plugins.Registry[pluginType]
if !ok {
return fmt.Errorf("plugin type '%s' not found in registry", pluginType)
}

plugin, err := factory(name, nil, handle)
plugin, err := reg.Factory(name, nil, handle)
if err != nil {
return fmt.Errorf("failed to instantiate default plugin '%s': %w", name, err)
}
Expand Down
51 changes: 51 additions & 0 deletions pkg/epp/plugins/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package plugins provides the core extensibility framework for the Endpoint Picker (EPP).
// It enables the system to be composed of modular, pluggable components that handle everything from
// request scheduling to flow control and data enrichment.
//
// # Core Concepts
//
// 1. Registry & Lifecycle
//
// Plugins are registered globally via the Registry. The system supports two lifecycles:
//
// - Singleton (Default): Instantiated once at startup.
// - Transient: Instantiated on-demand at runtime via a Factory.
//
// 2. Configuration (Handle)
//
// The Handle serves as the bridge between the configuration (YAML) and the runtime.
// It holds "Blueprints" (config for Transient plugins) and references to "Instances" (active
// Singleton plugins).
//
// 3. Factory
//
// The PluginFactory is the mechanism for creating Transient plugins. It resolves a configuration
// blueprint from the Handle and instantiates a new, distinct plugin instance, allowing for unique
// runtime identities (e.g., "tenant-a-queue").
//
// Architectural Distinction: The DAG vs. Flow Control
//
// The EPP Request Control DAG operates exclusively on Singleton plugins.
// It relies on a static topological sort performed at startup.
//
// Transient plugins (LifecycleTransient) do NOT participate in the global Request Control DAG.
// They live inside specific subsystems (like the Flow Control Layer) and have their own independent
// lifecycles managed by that subsystem. They are not accessible via handle.GetAllPlugins() and
// cannot be configured as dependencies for PrepareData plugins.
package plugins
90 changes: 90 additions & 0 deletions pkg/epp/plugins/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package plugins

import (
"fmt"
)

// PluginFactory abstracts the creation of transient plugin instances.
type PluginFactory interface {
// NewPlugin creates a fresh instance of a plugin based on a configuration blueprint.
//
// Arguments:
// - blueprintName: The name of the PluginSpec in the configuration (e.g., "standard-fairness-policy").
// - instanceAlias: An optional runtime name for this specific instance (e.g., "tenant-a").
// If empty, the blueprintName is used.
NewPlugin(blueprintName string, instanceAlias string) (Plugin, error)
}

// EPPPluginFactory is the concrete implementation of PluginFactory.
// It ties together the configuration (Handle) and the implementation (Registry).
type EPPPluginFactory struct {
handle Handle
}

// NewEPPPluginFactory returns a new factory instance.
func NewEPPPluginFactory(handle Handle) *EPPPluginFactory {
return &EPPPluginFactory{handle: handle}
}

// NewPlugin implements PluginFactory.
func (f *EPPPluginFactory) NewPlugin(blueprintName string, instanceAlias string) (Plugin, error) {
spec := f.handle.PluginSpec(blueprintName)
if spec == nil {
return nil, fmt.Errorf("plugin blueprint %q not found in configuration", blueprintName)
}

reg, ok := Registry[spec.Type]
if !ok {
return nil, fmt.Errorf("plugin type %q (referenced by blueprint %q) is not registered", spec.Type, blueprintName)
}

// Determine runtime identity. This ensures that structured logs and internal maps can distinguish between different
// instances of the same plugin type.
finalName := spec.Name
if instanceAlias != "" {
finalName = instanceAlias
}

plugin, err := reg.Factory(finalName, spec.Parameters, f.handle)
if err != nil {
return nil, fmt.Errorf("failed to instantiate plugin %q (type %s): %w", finalName, spec.Type, err)
}

return plugin, nil
}

// NewPluginByType is a helper to create a plugin and assert its type in one step.
func NewPluginByType[T Plugin](factory PluginFactory, blueprintName string, instanceAlias string) (T, error) {
var zero T

rawPlugin, err := factory.NewPlugin(blueprintName, instanceAlias)
if err != nil {
return zero, err
}

plugin, ok := rawPlugin.(T)
if !ok {
return zero, fmt.Errorf(
"plugin created from blueprint %q is type %T, but expected %T",
blueprintName, rawPlugin, zero,
)
}

return plugin, nil
}
176 changes: 176 additions & 0 deletions pkg/epp/plugins/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package plugins

import (
"context"
"encoding/json"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1"
)

// transientPlugin is a mock implementation that captures how it was created.
type transientPlugin struct {
Name string
Parameters string
}

func (p *transientPlugin) TypedName() TypedName {
return TypedName{Type: "transientPlugin", Name: p.Name}
}

// transientFactory creates transientPlugin instances.
func transientFactory(name string, parameters json.RawMessage, _ Handle) (Plugin, error) {
if string(parameters) == `"fail-me"` {
return nil, errors.New("intentional factory failure")
}
return &transientPlugin{
Name: name,
Parameters: string(parameters),
}, nil
}

func TestEPPPluginFactory_NewPlugin(t *testing.T) {
// Register a known type for testing factories.
// We use a unique name to ensure parallel safety if other tests use the registry.
const factoryType = "test-transient-factory-type"
RegisterWithMetadata(factoryType, PluginRegistration{
Factory: transientFactory,
Lifecycle: LifecycleTransient,
})

t.Parallel()

specs := []configapi.PluginSpec{
{
Name: "blueprint-default",
Type: factoryType,
Parameters: json.RawMessage(`{"key": "val"}`),
},
{
Name: "blueprint-broken-factory",
Type: factoryType,
Parameters: json.RawMessage(`"fail-me"`),
},
{
Name: "blueprint-missing-type",
Type: "unknown-type",
},
}

handle := NewEppHandle(context.Background(), specs, nil, nil)
factory := NewEPPPluginFactory(handle)

tests := []struct {
name string
blueprintName string
instanceAlias string
expectErr bool
errorContains string
expectedName string // The name the plugin instance should think it has.
expectedParams string
}{
{
name: "success_standard_creation",
blueprintName: "blueprint-default",
instanceAlias: "", // No alias
expectErr: false,
expectedName: "blueprint-default", // Should default to blueprint name.
expectedParams: `{"key": "val"}`,
},
{
name: "success_with_instance_alias",
blueprintName: "blueprint-default",
instanceAlias: "tenant-a-queue",
expectErr: false,
expectedName: "tenant-a-queue", // Should take the alias.
expectedParams: `{"key": "val"}`,
},
{
name: "fail_blueprint_not_found",
blueprintName: "non-existent-blueprint",
expectErr: true,
errorContains: "blueprint \"non-existent-blueprint\" not found",
},
{
name: "fail_plugin_type_not_registered",
blueprintName: "blueprint-missing-type",
expectErr: true,
errorContains: "plugin type \"unknown-type\" (referenced by blueprint \"blueprint-missing-type\") is not registered",
},
{
name: "fail_factory_returns_error",
blueprintName: "blueprint-broken-factory",
expectErr: true,
errorContains: "failed to instantiate plugin",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

plugin, err := factory.NewPlugin(tc.blueprintName, tc.instanceAlias)

if tc.expectErr {
require.Error(t, err)
assert.Nil(t, plugin)
assert.Contains(t, err.Error(), tc.errorContains)
} else {
require.NoError(t, err)
require.NotNil(t, plugin)

// Cast to verify internal state
p, ok := plugin.(*transientPlugin)
require.True(t, ok, "plugin should be of type *transientPlugin")
assert.Equal(t, tc.expectedName, p.Name, "plugin name should match expected identity")
assert.JSONEq(t, tc.expectedParams, p.Parameters, "parameters should be passed through correctly")
}
})
}
}

func TestNewPluginByType_GenericHelper(t *testing.T) {
const helperType = "test-helper-type"
RegisterWithMetadata(helperType, PluginRegistration{
Factory: transientFactory,
Lifecycle: LifecycleTransient,
})

t.Parallel()

specs := []configapi.PluginSpec{{Name: "bp", Type: helperType}}
handle := NewEppHandle(context.Background(), specs, nil, nil)
factory := NewEPPPluginFactory(handle)

t.Run("success_cast", func(t *testing.T) {
p, err := NewPluginByType[*transientPlugin](factory, "bp", "alias")
require.NoError(t, err)
assert.Equal(t, "alias", p.Name)
})

t.Run("fail_cast_mismatch", func(t *testing.T) {
// Try to cast the transientPlugin to mockPluginImpl (which it is not).
_, err := NewPluginByType[*mockPluginImpl](factory, "bp", "alias")
require.Error(t, err)
assert.Contains(t, err.Error(), "is type *plugins.transientPlugin, but expected *plugins.mockPluginImpl")
})
}
Loading