Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions cmd/vmcp/app/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ package app
import (
"context"
"fmt"
"os"
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"k8s.io/client-go/rest"

"github.com/stacklok/toolhive/pkg/audit"
"github.com/stacklok/toolhive/pkg/env"
Expand All @@ -21,6 +23,7 @@ import (
"github.com/stacklok/toolhive/pkg/vmcp/config"
"github.com/stacklok/toolhive/pkg/vmcp/discovery"
"github.com/stacklok/toolhive/pkg/vmcp/health"
"github.com/stacklok/toolhive/pkg/vmcp/k8s"
vmcprouter "github.com/stacklok/toolhive/pkg/vmcp/router"
vmcpserver "github.com/stacklok/toolhive/pkg/vmcp/server"
)
Expand Down Expand Up @@ -294,16 +297,56 @@ func runServe(cmd *cobra.Command, _ []string) error {
// Create aggregator
agg := aggregator.NewDefaultAggregator(backendClient, conflictResolver, cfg.Aggregation.Tools)

// Create backend registry for CLI environment
// CLI always uses immutable registry (backends fixed at startup)
backendRegistry := vmcp.NewImmutableRegistry(backends)
// Use DynamicRegistry for version-based cache invalidation
// Works in both standalone (CLI with YAML config) and Kubernetes (operator-deployed) modes
// In standalone mode: backends from config file, no dynamic updates
// In K8s mode with discovered auth: backends watched dynamically via BackendWatcher
dynamicRegistry := vmcp.NewDynamicRegistry(backends)
backendRegistry := vmcp.BackendRegistry(dynamicRegistry)

// Use standard manager (no version-based invalidation needed)
discoveryMgr, err := discovery.NewManager(agg)
// Use NewManagerWithRegistry to enable version-based cache invalidation
discoveryMgr, err := discovery.NewManagerWithRegistry(agg, dynamicRegistry)
if err != nil {
return fmt.Errorf("failed to create discovery manager: %w", err)
}
logger.Info("Immutable backend registry created for CLI environment")
logger.Info("Dynamic backend registry enabled for Kubernetes environment")

// Backend watcher for dynamic backend discovery
var backendWatcher *k8s.BackendWatcher

// If outgoingAuth.source is "discovered", start K8s backend watcher to watch backend changes
if cfg.OutgoingAuth != nil && cfg.OutgoingAuth.Source == "discovered" {
logger.Info("Detected dynamic backend discovery mode (outgoingAuth.source: discovered)")

// Get in-cluster REST config
restConfig, err := rest.InClusterConfig()
if err != nil {
return fmt.Errorf("failed to get in-cluster config: %w", err)
}

// Get namespace from environment variable set by operator
// The operator sets VMCP_NAMESPACE to the VirtualMCPServer's namespace
namespace := os.Getenv("VMCP_NAMESPACE")
if namespace == "" {
return fmt.Errorf("VMCP_NAMESPACE environment variable not set")
}

// Create K8s backend watcher to watch backend changes
backendWatcher, err = k8s.NewBackendWatcher(restConfig, namespace, cfg.Group, dynamicRegistry)
if err != nil {
return fmt.Errorf("failed to create backend watcher: %w", err)
}

// Start K8s backend watcher in background goroutine
go func() {
logger.Info("Starting Kubernetes backend watcher in background")
if err := backendWatcher.Start(ctx); err != nil {
logger.Errorf("Backend watcher stopped with error: %v", err)
}
}()

logger.Info("Kubernetes backend watcher started for dynamic backend discovery")
}

// Create router
rtr := vmcprouter.NewDefaultRouter()
Expand Down Expand Up @@ -362,13 +405,15 @@ func runServe(cmd *cobra.Command, _ []string) error {
serverCfg := &vmcpserver.Config{
Name: cfg.Name,
Version: getVersion(),
GroupRef: cfg.Group,
Host: host,
Port: port,
AuthMiddleware: authMiddleware,
AuthInfoHandler: authInfoHandler,
TelemetryProvider: telemetryProvider,
AuditConfig: cfg.Audit,
HealthMonitorConfig: healthMonitorConfig,
Watcher: backendWatcher,
}

// Convert composite tool configurations to workflow definitions
Expand Down
258 changes: 258 additions & 0 deletions pkg/vmcp/k8s/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
// Package k8s provides Kubernetes integration for Virtual MCP Server dynamic mode.
//
// In dynamic mode (outgoingAuth.source: discovered), the vMCP server runs a
// controller-runtime manager with informers to watch K8s resources dynamically.
// This enables backends to be added/removed from the MCPGroup without restarting.
package k8s

import (
"context"
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/manager"

mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
"github.com/stacklok/toolhive/pkg/logger"
"github.com/stacklok/toolhive/pkg/vmcp"
)

// BackendWatcher wraps a controller-runtime manager for vMCP dynamic mode.
//
// In K8s mode (outgoingAuth.source: discovered), this watcher runs informers
// that watch for backend changes in the referenced MCPGroup. When backends
// are added or removed, the watcher updates the DynamicRegistry which triggers
// cache invalidation via version-based lazy invalidation.
//
// Design Philosophy:
// - Wraps controller-runtime manager for lifecycle management
// - Provides WaitForCacheSync for readiness probe gating
// - Graceful shutdown on context cancellation
// - Single responsibility: watch K8s resources and update registry
//
// Static mode (CLI) skips this entirely - no controller-runtime, no informers.
type BackendWatcher struct {
// ctrlManager is the underlying controller-runtime manager
ctrlManager manager.Manager

// namespace is the namespace to watch for resources
namespace string

// groupRef identifies the MCPGroup to watch (format: "namespace/name")
groupRef string

// registry is the DynamicRegistry to update when backends change
registry vmcp.DynamicRegistry

// mu protects the started field for thread-safe access
mu sync.Mutex

// started tracks if the watcher has been started (protected by mu)
started bool
}

// NewBackendWatcher creates a new backend watcher for vMCP dynamic mode.
//
// This initializes a controller-runtime manager configured to watch resources
// in the specified namespace. The watcher will monitor the referenced MCPGroup
// and update the DynamicRegistry when backends are added or removed.
//
// Parameters:
// - cfg: Kubernetes REST config (typically from in-cluster config)
// - namespace: Namespace to watch for resources
// - groupRef: MCPGroup reference in "namespace/name" format
// - registry: DynamicRegistry to update when backends change
//
// Returns:
// - *BackendWatcher: Configured watcher ready to Start()
// - error: Configuration or initialization errors
//
// Example:
//
// restConfig, _ := rest.InClusterConfig()
// registry := vmcp.NewDynamicRegistry(initialBackends)
// watcher, err := k8s.NewBackendWatcher(restConfig, "default", "default/my-group", registry)
// if err != nil {
// return err
// }
// go watcher.Start(ctx)
// if !watcher.WaitForCacheSync(ctx) {
// return fmt.Errorf("cache sync failed")
// }
func NewBackendWatcher(
cfg *rest.Config,
namespace string,
groupRef string,
registry vmcp.DynamicRegistry,
) (*BackendWatcher, error) {
if cfg == nil {
return nil, fmt.Errorf("rest config cannot be nil")
}
if namespace == "" {
return nil, fmt.Errorf("namespace cannot be empty")
}
if groupRef == "" {
return nil, fmt.Errorf("groupRef cannot be empty")
}
if registry == nil {
return nil, fmt.Errorf("registry cannot be nil")
}

// Create runtime scheme and register ToolHive CRDs
scheme := runtime.NewScheme()
if err := mcpv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register ToolHive CRDs to scheme: %w", err)
}

// Create controller-runtime manager with namespace-scoped cache
ctrlManager, err := ctrl.NewManager(cfg, manager.Options{
Scheme: scheme,
Cache: cache.Options{
DefaultNamespaces: map[string]cache.Config{
namespace: {},
},
},
// Disable health probes - vMCP server handles its own
HealthProbeBindAddress: "0",
// Leader election not needed for vMCP (single replica per VirtualMCPServer)
LeaderElection: false,
})
if err != nil {
return nil, fmt.Errorf("failed to create controller manager: %w", err)
}

return &BackendWatcher{
ctrlManager: ctrlManager,
namespace: namespace,
groupRef: groupRef,
registry: registry,
started: false,
}, nil
}

// Start starts the controller-runtime manager and blocks until context is cancelled.
//
// This method runs informers that watch for backend changes in the MCPGroup.
// It's designed to run in a background goroutine and will gracefully shutdown
// when the context is cancelled.
//
// Design Notes:
// - Blocks until context cancellation (controller-runtime pattern)
// - Graceful shutdown on context cancel
// - Safe to call only once (subsequent calls will error)
//
// Example:
//
// go func() {
// if err := watcher.Start(ctx); err != nil {
// logger.Errorf("BackendWatcher stopped with error: %v", err)
// }
// }()
func (w *BackendWatcher) Start(ctx context.Context) error {
w.mu.Lock()
if w.started {
w.mu.Unlock()
return fmt.Errorf("watcher already started")
}
w.started = true
w.mu.Unlock()

logger.Info("Starting Kubernetes backend watcher for vMCP dynamic mode")
logger.Infof("Watching namespace: %s, group: %s", w.namespace, w.groupRef)

// TODO: Add backend watcher controller
// err := w.addBackendWatchController()
// if err != nil {
// return fmt.Errorf("failed to add backend watch controller: %w", err)
// }

// Start the manager (blocks until context cancelled)
if err := w.ctrlManager.Start(ctx); err != nil {
return fmt.Errorf("watcher failed: %w", err)
}

logger.Info("Kubernetes backend watcher stopped")
return nil
}

// WaitForCacheSync waits for the watcher's informer caches to sync.
//
// This is used by the /readyz endpoint to gate readiness until the watcher
// has populated its caches. This ensures the vMCP server doesn't serve requests
// until it has an accurate view of backends.
//
// Parameters:
// - ctx: Context with optional timeout for the wait operation
//
// Returns:
// - bool: true if caches synced successfully, false on timeout or error
//
// Design Notes:
// - Non-blocking if watcher not started (returns false)
// - Respects context timeout (e.g., 5-second readiness probe timeout)
// - Safe to call multiple times (idempotent)
//
// Example (readiness probe):
//
// func (s *Server) handleReadiness(w http.ResponseWriter, r *http.Request) {
// if s.backendWatcher != nil {
// ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
// defer cancel()
// if !s.backendWatcher.WaitForCacheSync(ctx) {
// w.WriteHeader(http.StatusServiceUnavailable)
// return
// }
// }
// w.WriteHeader(http.StatusOK)
// }
func (w *BackendWatcher) WaitForCacheSync(ctx context.Context) bool {
w.mu.Lock()
started := w.started
w.mu.Unlock()

if !started {
logger.Warn("WaitForCacheSync called but watcher not started")
return false
}

// Get the cache from the manager
informerCache := w.ctrlManager.GetCache()

// Create a timeout context if not already set
// Default to 30 seconds to handle typical K8s API latency
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 30*time.Second)
defer cancel()
}

logger.Info("Waiting for Kubernetes cache sync...")

// Wait for cache to sync
synced := informerCache.WaitForCacheSync(ctx)
if !synced {
logger.Warn("Cache sync timed out or failed")
return false
}

logger.Info("Kubernetes cache synced successfully")
return true
}

// TODO: Add backend watch controller implementation in next phase
// This will watch the MCPGroup and call registry.Upsert/Remove when backends change
// func (w *BackendWatcher) addBackendWatchController() error {
// // Create reconciler that watches MCPGroup
// // On reconcile:
// // 1. Get MCPGroup spec
// // 2. Extract backend list
// // 3. Call registry.Upsert for new/updated backends
// // 4. Call registry.Remove for deleted backends
// // This triggers cache invalidation via version increment
// return nil
// }
Loading
Loading