diff --git a/cmd/vmcp/app/commands.go b/cmd/vmcp/app/commands.go index f986ec973c..ca6060bcab 100644 --- a/cmd/vmcp/app/commands.go +++ b/cmd/vmcp/app/commands.go @@ -4,10 +4,12 @@ package app import ( "context" "fmt" + "os" "time" "github.com/spf13/cobra" "github.com/spf13/viper" + "k8s.io/client-go/rest" "github.com/stacklok/toolhive/pkg/audit" "github.com/stacklok/toolhive/pkg/env" @@ -21,6 +23,7 @@ import ( "github.com/stacklok/toolhive/pkg/vmcp/config" "github.com/stacklok/toolhive/pkg/vmcp/discovery" "github.com/stacklok/toolhive/pkg/vmcp/health" + "github.com/stacklok/toolhive/pkg/vmcp/k8s" vmcprouter "github.com/stacklok/toolhive/pkg/vmcp/router" vmcpserver "github.com/stacklok/toolhive/pkg/vmcp/server" ) @@ -294,16 +297,56 @@ func runServe(cmd *cobra.Command, _ []string) error { // Create aggregator agg := aggregator.NewDefaultAggregator(backendClient, conflictResolver, cfg.Aggregation.Tools) - // Create backend registry for CLI environment - // CLI always uses immutable registry (backends fixed at startup) - backendRegistry := vmcp.NewImmutableRegistry(backends) + // Use DynamicRegistry for version-based cache invalidation + // Works in both standalone (CLI with YAML config) and Kubernetes (operator-deployed) modes + // In standalone mode: backends from config file, no dynamic updates + // In K8s mode with discovered auth: backends watched dynamically via BackendWatcher + dynamicRegistry := vmcp.NewDynamicRegistry(backends) + backendRegistry := vmcp.BackendRegistry(dynamicRegistry) - // Use standard manager (no version-based invalidation needed) - discoveryMgr, err := discovery.NewManager(agg) + // Use NewManagerWithRegistry to enable version-based cache invalidation + discoveryMgr, err := discovery.NewManagerWithRegistry(agg, dynamicRegistry) if err != nil { return fmt.Errorf("failed to create discovery manager: %w", err) } - logger.Info("Immutable backend registry created for CLI environment") + logger.Info("Dynamic backend registry enabled for Kubernetes environment") + + // Backend watcher for dynamic backend discovery + var backendWatcher *k8s.BackendWatcher + + // If outgoingAuth.source is "discovered", start K8s backend watcher to watch backend changes + if cfg.OutgoingAuth != nil && cfg.OutgoingAuth.Source == "discovered" { + logger.Info("Detected dynamic backend discovery mode (outgoingAuth.source: discovered)") + + // Get in-cluster REST config + restConfig, err := rest.InClusterConfig() + if err != nil { + return fmt.Errorf("failed to get in-cluster config: %w", err) + } + + // Get namespace from environment variable set by operator + // The operator sets VMCP_NAMESPACE to the VirtualMCPServer's namespace + namespace := os.Getenv("VMCP_NAMESPACE") + if namespace == "" { + return fmt.Errorf("VMCP_NAMESPACE environment variable not set") + } + + // Create K8s backend watcher to watch backend changes + backendWatcher, err = k8s.NewBackendWatcher(restConfig, namespace, cfg.Group, dynamicRegistry) + if err != nil { + return fmt.Errorf("failed to create backend watcher: %w", err) + } + + // Start K8s backend watcher in background goroutine + go func() { + logger.Info("Starting Kubernetes backend watcher in background") + if err := backendWatcher.Start(ctx); err != nil { + logger.Errorf("Backend watcher stopped with error: %v", err) + } + }() + + logger.Info("Kubernetes backend watcher started for dynamic backend discovery") + } // Create router rtr := vmcprouter.NewDefaultRouter() @@ -362,6 +405,7 @@ func runServe(cmd *cobra.Command, _ []string) error { serverCfg := &vmcpserver.Config{ Name: cfg.Name, Version: getVersion(), + GroupRef: cfg.Group, Host: host, Port: port, AuthMiddleware: authMiddleware, @@ -369,6 +413,7 @@ func runServe(cmd *cobra.Command, _ []string) error { TelemetryProvider: telemetryProvider, AuditConfig: cfg.Audit, HealthMonitorConfig: healthMonitorConfig, + Watcher: backendWatcher, } // Convert composite tool configurations to workflow definitions diff --git a/pkg/vmcp/k8s/manager.go b/pkg/vmcp/k8s/manager.go new file mode 100644 index 0000000000..7c098cda38 --- /dev/null +++ b/pkg/vmcp/k8s/manager.go @@ -0,0 +1,258 @@ +// Package k8s provides Kubernetes integration for Virtual MCP Server dynamic mode. +// +// In dynamic mode (outgoingAuth.source: discovered), the vMCP server runs a +// controller-runtime manager with informers to watch K8s resources dynamically. +// This enables backends to be added/removed from the MCPGroup without restarting. +package k8s + +import ( + "context" + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/manager" + + mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1" + "github.com/stacklok/toolhive/pkg/logger" + "github.com/stacklok/toolhive/pkg/vmcp" +) + +// BackendWatcher wraps a controller-runtime manager for vMCP dynamic mode. +// +// In K8s mode (outgoingAuth.source: discovered), this watcher runs informers +// that watch for backend changes in the referenced MCPGroup. When backends +// are added or removed, the watcher updates the DynamicRegistry which triggers +// cache invalidation via version-based lazy invalidation. +// +// Design Philosophy: +// - Wraps controller-runtime manager for lifecycle management +// - Provides WaitForCacheSync for readiness probe gating +// - Graceful shutdown on context cancellation +// - Single responsibility: watch K8s resources and update registry +// +// Static mode (CLI) skips this entirely - no controller-runtime, no informers. +type BackendWatcher struct { + // ctrlManager is the underlying controller-runtime manager + ctrlManager manager.Manager + + // namespace is the namespace to watch for resources + namespace string + + // groupRef identifies the MCPGroup to watch (format: "namespace/name") + groupRef string + + // registry is the DynamicRegistry to update when backends change + registry vmcp.DynamicRegistry + + // mu protects the started field for thread-safe access + mu sync.Mutex + + // started tracks if the watcher has been started (protected by mu) + started bool +} + +// NewBackendWatcher creates a new backend watcher for vMCP dynamic mode. +// +// This initializes a controller-runtime manager configured to watch resources +// in the specified namespace. The watcher will monitor the referenced MCPGroup +// and update the DynamicRegistry when backends are added or removed. +// +// Parameters: +// - cfg: Kubernetes REST config (typically from in-cluster config) +// - namespace: Namespace to watch for resources +// - groupRef: MCPGroup reference in "namespace/name" format +// - registry: DynamicRegistry to update when backends change +// +// Returns: +// - *BackendWatcher: Configured watcher ready to Start() +// - error: Configuration or initialization errors +// +// Example: +// +// restConfig, _ := rest.InClusterConfig() +// registry := vmcp.NewDynamicRegistry(initialBackends) +// watcher, err := k8s.NewBackendWatcher(restConfig, "default", "default/my-group", registry) +// if err != nil { +// return err +// } +// go watcher.Start(ctx) +// if !watcher.WaitForCacheSync(ctx) { +// return fmt.Errorf("cache sync failed") +// } +func NewBackendWatcher( + cfg *rest.Config, + namespace string, + groupRef string, + registry vmcp.DynamicRegistry, +) (*BackendWatcher, error) { + if cfg == nil { + return nil, fmt.Errorf("rest config cannot be nil") + } + if namespace == "" { + return nil, fmt.Errorf("namespace cannot be empty") + } + if groupRef == "" { + return nil, fmt.Errorf("groupRef cannot be empty") + } + if registry == nil { + return nil, fmt.Errorf("registry cannot be nil") + } + + // Create runtime scheme and register ToolHive CRDs + scheme := runtime.NewScheme() + if err := mcpv1alpha1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("failed to register ToolHive CRDs to scheme: %w", err) + } + + // Create controller-runtime manager with namespace-scoped cache + ctrlManager, err := ctrl.NewManager(cfg, manager.Options{ + Scheme: scheme, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{ + namespace: {}, + }, + }, + // Disable health probes - vMCP server handles its own + HealthProbeBindAddress: "0", + // Leader election not needed for vMCP (single replica per VirtualMCPServer) + LeaderElection: false, + }) + if err != nil { + return nil, fmt.Errorf("failed to create controller manager: %w", err) + } + + return &BackendWatcher{ + ctrlManager: ctrlManager, + namespace: namespace, + groupRef: groupRef, + registry: registry, + started: false, + }, nil +} + +// Start starts the controller-runtime manager and blocks until context is cancelled. +// +// This method runs informers that watch for backend changes in the MCPGroup. +// It's designed to run in a background goroutine and will gracefully shutdown +// when the context is cancelled. +// +// Design Notes: +// - Blocks until context cancellation (controller-runtime pattern) +// - Graceful shutdown on context cancel +// - Safe to call only once (subsequent calls will error) +// +// Example: +// +// go func() { +// if err := watcher.Start(ctx); err != nil { +// logger.Errorf("BackendWatcher stopped with error: %v", err) +// } +// }() +func (w *BackendWatcher) Start(ctx context.Context) error { + w.mu.Lock() + if w.started { + w.mu.Unlock() + return fmt.Errorf("watcher already started") + } + w.started = true + w.mu.Unlock() + + logger.Info("Starting Kubernetes backend watcher for vMCP dynamic mode") + logger.Infof("Watching namespace: %s, group: %s", w.namespace, w.groupRef) + + // TODO: Add backend watcher controller + // err := w.addBackendWatchController() + // if err != nil { + // return fmt.Errorf("failed to add backend watch controller: %w", err) + // } + + // Start the manager (blocks until context cancelled) + if err := w.ctrlManager.Start(ctx); err != nil { + return fmt.Errorf("watcher failed: %w", err) + } + + logger.Info("Kubernetes backend watcher stopped") + return nil +} + +// WaitForCacheSync waits for the watcher's informer caches to sync. +// +// This is used by the /readyz endpoint to gate readiness until the watcher +// has populated its caches. This ensures the vMCP server doesn't serve requests +// until it has an accurate view of backends. +// +// Parameters: +// - ctx: Context with optional timeout for the wait operation +// +// Returns: +// - bool: true if caches synced successfully, false on timeout or error +// +// Design Notes: +// - Non-blocking if watcher not started (returns false) +// - Respects context timeout (e.g., 5-second readiness probe timeout) +// - Safe to call multiple times (idempotent) +// +// Example (readiness probe): +// +// func (s *Server) handleReadiness(w http.ResponseWriter, r *http.Request) { +// if s.backendWatcher != nil { +// ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) +// defer cancel() +// if !s.backendWatcher.WaitForCacheSync(ctx) { +// w.WriteHeader(http.StatusServiceUnavailable) +// return +// } +// } +// w.WriteHeader(http.StatusOK) +// } +func (w *BackendWatcher) WaitForCacheSync(ctx context.Context) bool { + w.mu.Lock() + started := w.started + w.mu.Unlock() + + if !started { + logger.Warn("WaitForCacheSync called but watcher not started") + return false + } + + // Get the cache from the manager + informerCache := w.ctrlManager.GetCache() + + // Create a timeout context if not already set + // Default to 30 seconds to handle typical K8s API latency + if _, hasDeadline := ctx.Deadline(); !hasDeadline { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 30*time.Second) + defer cancel() + } + + logger.Info("Waiting for Kubernetes cache sync...") + + // Wait for cache to sync + synced := informerCache.WaitForCacheSync(ctx) + if !synced { + logger.Warn("Cache sync timed out or failed") + return false + } + + logger.Info("Kubernetes cache synced successfully") + return true +} + +// TODO: Add backend watch controller implementation in next phase +// This will watch the MCPGroup and call registry.Upsert/Remove when backends change +// func (w *BackendWatcher) addBackendWatchController() error { +// // Create reconciler that watches MCPGroup +// // On reconcile: +// // 1. Get MCPGroup spec +// // 2. Extract backend list +// // 3. Call registry.Upsert for new/updated backends +// // 4. Call registry.Remove for deleted backends +// // This triggers cache invalidation via version increment +// return nil +// } diff --git a/pkg/vmcp/k8s/manager_test.go b/pkg/vmcp/k8s/manager_test.go new file mode 100644 index 0000000000..d9e8e6fe57 --- /dev/null +++ b/pkg/vmcp/k8s/manager_test.go @@ -0,0 +1,263 @@ +package k8s_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/rest" + + "github.com/stacklok/toolhive/pkg/vmcp" + "github.com/stacklok/toolhive/pkg/vmcp/k8s" +) + +// TestNewBackendWatcher tests the backend watcher factory function validation +func TestNewBackendWatcher(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + cfg *rest.Config + namespace string + groupRef string + registry vmcp.DynamicRegistry + expectedError string + }{ + { + name: "nil config", + cfg: nil, + namespace: "default", + groupRef: "default/test-group", + registry: vmcp.NewDynamicRegistry([]vmcp.Backend{}), + expectedError: "rest config cannot be nil", + }, + { + name: "empty namespace", + cfg: &rest.Config{}, + namespace: "", + groupRef: "default/test-group", + registry: vmcp.NewDynamicRegistry([]vmcp.Backend{}), + expectedError: "namespace cannot be empty", + }, + { + name: "empty groupRef", + cfg: &rest.Config{}, + namespace: "default", + groupRef: "", + registry: vmcp.NewDynamicRegistry([]vmcp.Backend{}), + expectedError: "groupRef cannot be empty", + }, + { + name: "nil registry", + cfg: &rest.Config{}, + namespace: "default", + groupRef: "default/test-group", + registry: nil, + expectedError: "registry cannot be nil", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + mgr, err := k8s.NewBackendWatcher(tc.cfg, tc.namespace, tc.groupRef, tc.registry) + + if tc.expectedError != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.expectedError) + assert.Nil(t, mgr) + } else { + require.NoError(t, err) + assert.NotNil(t, mgr) + } + }) + } +} + +// TestNewBackendWatcher_ValidInputs tests that NewBackendWatcher succeeds with valid inputs +// Note: This test validates that the watcher can be created, but doesn't start it +// to avoid requiring kubebuilder/envtest binaries in CI. +func TestNewBackendWatcher_ValidInputs(t *testing.T) { + t.Parallel() + + // Create a basic REST config (doesn't need to connect to real cluster) + cfg := &rest.Config{ + Host: "https://localhost:6443", + } + + registry := vmcp.NewDynamicRegistry([]vmcp.Backend{ + { + ID: "test-backend", + Name: "Test Backend", + }, + }) + + mgr, err := k8s.NewBackendWatcher(cfg, "default", "default/test-group", registry) + require.NoError(t, err) + assert.NotNil(t, mgr) +} + +// TestBackendWatcher_WaitForCacheSync_NotStarted tests that WaitForCacheSync returns false +// when called before the watcher is started +func TestBackendWatcher_WaitForCacheSync_NotStarted(t *testing.T) { + t.Parallel() + + cfg := &rest.Config{ + Host: "https://localhost:6443", + } + + registry := vmcp.NewDynamicRegistry([]vmcp.Backend{}) + mgr, err := k8s.NewBackendWatcher(cfg, "default", "default/test-group", registry) + require.NoError(t, err) + + // Try to wait for cache sync without starting manager + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + synced := mgr.WaitForCacheSync(ctx) + assert.False(t, synced, "Cache sync should fail when watcher not started") +} + +// TestBackendWatcher_StartValidation tests that Start can be called and respects context +func TestBackendWatcher_StartValidation(t *testing.T) { + t.Parallel() + + cfg := &rest.Config{ + Host: "https://localhost:6443", + } + + registry := vmcp.NewDynamicRegistry([]vmcp.Backend{}) + mgr, err := k8s.NewBackendWatcher(cfg, "default", "default/test-group", registry) + require.NoError(t, err) + + // Start watcher in background with a short timeout + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + // Start will exit when context times out (no real cluster to connect to) + // This validates the watcher respects context cancellation + err = mgr.Start(ctx) + + // Either nil (graceful exit) or error (connection failure) are both acceptable + // The important thing is it doesn't hang + t.Logf("Start returned: %v", err) +} + +// mockBackendWatcherForTest is a simple mock for testing readiness endpoint behavior +type mockBackendWatcherForTest struct { + cacheSynced bool + syncCalled bool +} + +func (m *mockBackendWatcherForTest) WaitForCacheSync(_ context.Context) bool { + m.syncCalled = true + return m.cacheSynced +} + +// TestMockBackendWatcher_InterfaceCompliance verifies the mock implements the interface +func TestMockBackendWatcher_InterfaceCompliance(t *testing.T) { + t.Parallel() + + var _ interface { + WaitForCacheSync(ctx context.Context) bool + } = (*mockBackendWatcherForTest)(nil) +} + +// TestMockBackendWatcher_CacheSynced tests mock watcher behavior when cache is synced +func TestMockBackendWatcher_CacheSynced(t *testing.T) { + t.Parallel() + + mock := &mockBackendWatcherForTest{cacheSynced: true} + + ctx := context.Background() + synced := mock.WaitForCacheSync(ctx) + + assert.True(t, synced) + assert.True(t, mock.syncCalled, "WaitForCacheSync should have been called") +} + +// TestMockBackendWatcher_CacheNotSynced tests mock watcher behavior when cache is not synced +func TestMockBackendWatcher_CacheNotSynced(t *testing.T) { + t.Parallel() + + mock := &mockBackendWatcherForTest{cacheSynced: false} + + ctx := context.Background() + synced := mock.WaitForCacheSync(ctx) + + assert.False(t, synced) + assert.True(t, mock.syncCalled, "WaitForCacheSync should have been called") +} + +// TestBackendWatcher_Lifecycle documents the expected lifecycle without requiring real cluster +func TestBackendWatcher_Lifecycle(t *testing.T) { + t.Parallel() + + // This test documents the expected watcher lifecycle: + // 1. Create watcher with NewBackendWatcher + // 2. Start watcher in background goroutine + // 3. Wait for cache sync before serving requests + // 4. Cancel context to trigger graceful shutdown + + t.Run("documentation", func(t *testing.T) { + t.Parallel() + + // Example lifecycle (documented, not executed): + expectedLifecycle := ` + // Create watcher + cfg, _ := rest.InClusterConfig() + registry := vmcp.NewDynamicRegistry(backends) + watcher, _ := k8s.NewBackendWatcher(cfg, "default", "default/my-group", registry) + + // Start in background + ctx, cancel := context.WithCancel(context.Background()) + go watcher.Start(ctx) + + // Wait for cache sync (for readiness probe) + syncCtx, syncCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer syncCancel() + if !watcher.WaitForCacheSync(syncCtx) { + return fmt.Errorf("cache sync failed") + } + + // Server is ready to serve requests + // ... + + // Graceful shutdown + cancel() + ` + assert.NotEmpty(t, expectedLifecycle) + }) +} + +// TestBackendWatcher_ContextCancellation tests that context cancellation is respected +func TestBackendWatcher_ContextCancellation(t *testing.T) { + t.Parallel() + + cfg := &rest.Config{ + Host: "https://localhost:6443", + } + + registry := vmcp.NewDynamicRegistry([]vmcp.Backend{}) + mgr, err := k8s.NewBackendWatcher(cfg, "default", "default/test-group", registry) + require.NoError(t, err) + + // Create a context that's already cancelled + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + // Start should exit quickly when context is already cancelled + // This validates the watcher respects pre-cancelled contexts + startTime := time.Now() + err = mgr.Start(ctx) + duration := time.Since(startTime) + + // Should exit quickly (within 1 second) + assert.Less(t, duration, time.Second, "Start should exit quickly with cancelled context") + + // Either nil (graceful exit) or error (context cancelled) are acceptable + t.Logf("Start returned in %v: %v", duration, err) +} diff --git a/pkg/vmcp/server/mocks/mock_watcher.go b/pkg/vmcp/server/mocks/mock_watcher.go new file mode 100644 index 0000000000..6bfdac7f0b --- /dev/null +++ b/pkg/vmcp/server/mocks/mock_watcher.go @@ -0,0 +1,55 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: server.go +// +// Generated by this command: +// +// mockgen -destination=mocks/mock_watcher.go -package=mocks -source=server.go Watcher +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockWatcher is a mock of Watcher interface. +type MockWatcher struct { + ctrl *gomock.Controller + recorder *MockWatcherMockRecorder + isgomock struct{} +} + +// MockWatcherMockRecorder is the mock recorder for MockWatcher. +type MockWatcherMockRecorder struct { + mock *MockWatcher +} + +// NewMockWatcher creates a new mock instance. +func NewMockWatcher(ctrl *gomock.Controller) *MockWatcher { + mock := &MockWatcher{ctrl: ctrl} + mock.recorder = &MockWatcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockWatcher) EXPECT() *MockWatcherMockRecorder { + return m.recorder +} + +// WaitForCacheSync mocks base method. +func (m *MockWatcher) WaitForCacheSync(ctx context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WaitForCacheSync", ctx) + ret0, _ := ret[0].(bool) + return ret0 +} + +// WaitForCacheSync indicates an expected call of WaitForCacheSync. +func (mr *MockWatcherMockRecorder) WaitForCacheSync(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForCacheSync", reflect.TypeOf((*MockWatcher)(nil).WaitForCacheSync), ctx) +} diff --git a/pkg/vmcp/server/readiness_test.go b/pkg/vmcp/server/readiness_test.go new file mode 100644 index 0000000000..cf27ac63d3 --- /dev/null +++ b/pkg/vmcp/server/readiness_test.go @@ -0,0 +1,252 @@ +package server_test + +import ( + "context" + "encoding/json" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/stacklok/toolhive/pkg/networking" + "github.com/stacklok/toolhive/pkg/vmcp" + "github.com/stacklok/toolhive/pkg/vmcp/aggregator" + discoveryMocks "github.com/stacklok/toolhive/pkg/vmcp/discovery/mocks" + "github.com/stacklok/toolhive/pkg/vmcp/mocks" + "github.com/stacklok/toolhive/pkg/vmcp/router" + "github.com/stacklok/toolhive/pkg/vmcp/server" + serverMocks "github.com/stacklok/toolhive/pkg/vmcp/server/mocks" +) + +// ReadinessResponse mirrors the server's readiness response structure for test deserialization. +type ReadinessResponse struct { + Status string `json:"status"` + Mode string `json:"mode"` + Reason string `json:"reason,omitempty"` +} + +func TestReadinessEndpoint_StaticMode(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + mockBackendClient := mocks.NewMockBackendClient(ctrl) + mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) + rt := router.NewDefaultRouter() + + port := networking.FindAvailable() + require.NotZero(t, port, "Failed to find available port") + + mockDiscoveryMgr.EXPECT(). + Discover(gomock.Any(), gomock.Any()). + Return(&aggregator.AggregatedCapabilities{ + Tools: []vmcp.Tool{}, + Resources: []vmcp.Resource{}, + Prompts: []vmcp.Prompt{}, + RoutingTable: &vmcp.RoutingTable{ + Tools: make(map[string]*vmcp.BackendTarget), + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: make(map[string]*vmcp.BackendTarget), + }, + Metadata: &aggregator.AggregationMetadata{}, + }, nil). + AnyTimes() + mockDiscoveryMgr.EXPECT().Stop().AnyTimes() + + ctx, cancel := context.WithCancel(t.Context()) + + // Create server without Watcher (static mode) + srv, err := server.New(ctx, &server.Config{ + Name: "test-vmcp", + Version: "1.0.0", + Host: "127.0.0.1", + Port: port, + Watcher: nil, // Static mode + }, rt, mockBackendClient, mockDiscoveryMgr, vmcp.NewImmutableRegistry([]vmcp.Backend{}), nil) + require.NoError(t, err) + + t.Cleanup(cancel) + errCh := make(chan error, 1) + go func() { + if err := srv.Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-srv.Ready(): + case err := <-errCh: + t.Fatalf("Server failed to start: %v", err) + case <-time.After(5 * time.Second): + t.Fatalf("Server did not become ready within 5s") + } + + time.Sleep(10 * time.Millisecond) + + // Test /readyz endpoint in static mode + resp, err := http.Get("http://" + srv.Address() + "/readyz") + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode, "Static mode should always return 200 OK") + + var readiness ReadinessResponse + require.NoError(t, json.NewDecoder(resp.Body).Decode(&readiness)) + assert.Equal(t, "ready", readiness.Status) + assert.Equal(t, "static", readiness.Mode) +} + +func TestReadinessEndpoint_DynamicMode_CacheSynced(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + mockBackendClient := mocks.NewMockBackendClient(ctrl) + mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) + rt := router.NewDefaultRouter() + + port := networking.FindAvailable() + require.NotZero(t, port, "Failed to find available port") + + mockDiscoveryMgr.EXPECT(). + Discover(gomock.Any(), gomock.Any()). + Return(&aggregator.AggregatedCapabilities{ + Tools: []vmcp.Tool{}, + Resources: []vmcp.Resource{}, + Prompts: []vmcp.Prompt{}, + RoutingTable: &vmcp.RoutingTable{ + Tools: make(map[string]*vmcp.BackendTarget), + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: make(map[string]*vmcp.BackendTarget), + }, + Metadata: &aggregator.AggregationMetadata{}, + }, nil). + AnyTimes() + mockDiscoveryMgr.EXPECT().Stop().AnyTimes() + + ctx, cancel := context.WithCancel(t.Context()) + + // Create mock watcher with cache synced + mockWatcher := serverMocks.NewMockWatcher(ctrl) + mockWatcher.EXPECT().WaitForCacheSync(gomock.Any()).Return(true).AnyTimes() + + srv, err := server.New(ctx, &server.Config{ + Name: "test-vmcp", + Version: "1.0.0", + Host: "127.0.0.1", + Port: port, + Watcher: mockWatcher, // Dynamic mode with synced cache + }, rt, mockBackendClient, mockDiscoveryMgr, vmcp.NewDynamicRegistry([]vmcp.Backend{}), nil) + require.NoError(t, err) + + t.Cleanup(cancel) + errCh := make(chan error, 1) + go func() { + if err := srv.Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-srv.Ready(): + case err := <-errCh: + t.Fatalf("Server failed to start: %v", err) + case <-time.After(5 * time.Second): + t.Fatalf("Server did not become ready within 5s") + } + + time.Sleep(10 * time.Millisecond) + + // Test /readyz endpoint in dynamic mode with synced cache + resp, err := http.Get("http://" + srv.Address() + "/readyz") + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode, "Dynamic mode with synced cache should return 200 OK") + + var readiness ReadinessResponse + require.NoError(t, json.NewDecoder(resp.Body).Decode(&readiness)) + assert.Equal(t, "ready", readiness.Status) + assert.Equal(t, "dynamic", readiness.Mode) +} + +func TestReadinessEndpoint_DynamicMode_CacheNotSynced(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + t.Cleanup(ctrl.Finish) + + mockBackendClient := mocks.NewMockBackendClient(ctrl) + mockDiscoveryMgr := discoveryMocks.NewMockManager(ctrl) + rt := router.NewDefaultRouter() + + port := networking.FindAvailable() + require.NotZero(t, port, "Failed to find available port") + + mockDiscoveryMgr.EXPECT(). + Discover(gomock.Any(), gomock.Any()). + Return(&aggregator.AggregatedCapabilities{ + Tools: []vmcp.Tool{}, + Resources: []vmcp.Resource{}, + Prompts: []vmcp.Prompt{}, + RoutingTable: &vmcp.RoutingTable{ + Tools: make(map[string]*vmcp.BackendTarget), + Resources: make(map[string]*vmcp.BackendTarget), + Prompts: make(map[string]*vmcp.BackendTarget), + }, + Metadata: &aggregator.AggregationMetadata{}, + }, nil). + AnyTimes() + mockDiscoveryMgr.EXPECT().Stop().AnyTimes() + + ctx, cancel := context.WithCancel(t.Context()) + + // Create mock watcher with cache NOT synced + mockWatcher := serverMocks.NewMockWatcher(ctrl) + mockWatcher.EXPECT().WaitForCacheSync(gomock.Any()).Return(false).AnyTimes() + + srv, err := server.New(ctx, &server.Config{ + Name: "test-vmcp", + Version: "1.0.0", + Host: "127.0.0.1", + Port: port, + Watcher: mockWatcher, // Dynamic mode with unsynced cache + }, rt, mockBackendClient, mockDiscoveryMgr, vmcp.NewDynamicRegistry([]vmcp.Backend{}), nil) + require.NoError(t, err) + + t.Cleanup(cancel) + errCh := make(chan error, 1) + go func() { + if err := srv.Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-srv.Ready(): + case err := <-errCh: + t.Fatalf("Server failed to start: %v", err) + case <-time.After(5 * time.Second): + t.Fatalf("Server did not become ready within 5s") + } + + time.Sleep(10 * time.Millisecond) + + // Test /readyz endpoint in dynamic mode with unsynced cache + resp, err := http.Get("http://" + srv.Address() + "/readyz") + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode, "Dynamic mode with unsynced cache should return 503") + + var readiness ReadinessResponse + require.NoError(t, json.NewDecoder(resp.Body).Decode(&readiness)) + assert.Equal(t, "not_ready", readiness.Status) + assert.Equal(t, "dynamic", readiness.Mode) + assert.Equal(t, "cache_sync_pending", readiness.Reason) +} diff --git a/pkg/vmcp/server/server.go b/pkg/vmcp/server/server.go index 9c5ef94184..796ff849f4 100644 --- a/pkg/vmcp/server/server.go +++ b/pkg/vmcp/server/server.go @@ -56,6 +56,17 @@ const ( defaultSessionTTL = 30 * time.Minute ) +//go:generate mockgen -destination=mocks/mock_watcher.go -package=mocks -source=server.go Watcher + +// Watcher is the interface for Kubernetes backend watcher integration. +// Used in dynamic mode (outgoingAuth.source: discovered) to gate readiness +// on controller-runtime cache sync before serving requests. +type Watcher interface { + // WaitForCacheSync waits for the Kubernetes informer caches to sync. + // Returns true if caches synced successfully, false on timeout or error. + WaitForCacheSync(ctx context.Context) bool +} + // Config holds the Virtual MCP Server configuration. type Config struct { // Name is the server name exposed in MCP protocol @@ -102,6 +113,11 @@ type Config struct { // HealthMonitorConfig is the optional health monitoring configuration. // If nil, health monitoring is disabled. HealthMonitorConfig *health.MonitorConfig + + // Watcher is the optional Kubernetes backend watcher for dynamic mode. + // Only set when running in K8s with outgoingAuth.source: discovered. + // Used for /readyz endpoint to gate readiness on cache sync. + Watcher Watcher } // Server is the Virtual MCP Server that aggregates multiple backends. @@ -436,6 +452,7 @@ func (s *Server) Start(ctx context.Context) error { // Unauthenticated health endpoints mux.HandleFunc("/health", s.handleHealth) mux.HandleFunc("/ping", s.handleHealth) + mux.HandleFunc("/readyz", s.handleReadiness) mux.HandleFunc("/status", s.handleStatus) mux.HandleFunc("/api/backends/health", s.handleBackendHealth) @@ -661,6 +678,76 @@ func (*Server) handleHealth(w http.ResponseWriter, _ *http.Request) { } } +// handleReadiness handles /readyz HTTP requests for Kubernetes readiness probes. +// +// In dynamic mode (K8s with outgoingAuth.source: discovered), this endpoint gates +// readiness on the controller-runtime manager's cache sync status. The pod will +// not be marked ready until the manager has populated its cache with current +// backend information from the MCPGroup. +// +// In static mode (CLI or K8s with inline backends), this always returns 200 OK +// since there's no cache to sync. +// +// Design Pattern: +// This follows the same readiness gating pattern used by cert-manager and ArgoCD: +// - /health: Always returns 200 if server is responding (liveness probe) +// - /readyz: Returns 503 until caches synced, then 200 (readiness probe) +// +// K8s Configuration: +// +// readinessProbe: +// httpGet: +// path: /readyz +// port: 4483 +// initialDelaySeconds: 5 +// periodSeconds: 5 +// timeoutSeconds: 5 +func (s *Server) handleReadiness(w http.ResponseWriter, r *http.Request) { + // Static mode: always ready (no watcher, no cache to sync) + if s.config.Watcher == nil { + response := map[string]string{ + "status": "ready", + "mode": "static", + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(response); err != nil { + logger.Errorf("Failed to encode readiness response: %v", err) + } + return + } + + // Dynamic mode: gate readiness on cache sync + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + if !s.config.Watcher.WaitForCacheSync(ctx) { + // Cache not synced yet - return 503 Service Unavailable + response := map[string]string{ + "status": "not_ready", + "mode": "dynamic", + "reason": "cache_sync_pending", + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusServiceUnavailable) + if err := json.NewEncoder(w).Encode(response); err != nil { + logger.Errorf("Failed to encode readiness response: %v", err) + } + return + } + + // Cache synced - ready to serve requests + response := map[string]string{ + "status": "ready", + "mode": "dynamic", + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(response); err != nil { + logger.Errorf("Failed to encode readiness response: %v", err) + } +} + // SessionManager returns the session manager instance. // This is useful for testing and monitoring. func (s *Server) SessionManager() *transportsession.Manager { diff --git a/pkg/vmcp/workloads/k8s.go b/pkg/vmcp/workloads/k8s.go index 20feb6e595..fdcc751bac 100644 --- a/pkg/vmcp/workloads/k8s.go +++ b/pkg/vmcp/workloads/k8s.go @@ -21,7 +21,7 @@ import ( ) // k8sDiscoverer is a direct implementation of Discoverer for Kubernetes workloads. -// It uses the Kubernetes client directly to query MCPServer CRDs instead of going through k8s.Manager. +// It uses the Kubernetes client directly to query MCPServer CRDs instead of going through k8s.BackendWatcher. type k8sDiscoverer struct { k8sClient client.Client namespace string diff --git a/test/e2e/thv-operator/virtualmcp/virtualmcp_auth_discovery_test.go b/test/e2e/thv-operator/virtualmcp/virtualmcp_auth_discovery_test.go index d89a8c725a..3d070501c6 100644 --- a/test/e2e/thv-operator/virtualmcp/virtualmcp_auth_discovery_test.go +++ b/test/e2e/thv-operator/virtualmcp/virtualmcp_auth_discovery_test.go @@ -1095,6 +1095,16 @@ with socketserver.TCPServer(("", PORT), OIDCHandler) as httpd: var vmcpNodePort int32 BeforeAll(func() { + By("Verifying VirtualMCPServer is still ready") + WaitForVirtualMCPServerReady(ctx, k8sClient, vmcpServerName, testNamespace, timeout, pollingInterval) + + By("Verifying vMCP pods are still running and ready") + vmcpLabels := map[string]string{ + "app.kubernetes.io/name": "virtualmcpserver", + "app.kubernetes.io/instance": vmcpServerName, + } + WaitForPodsReady(ctx, k8sClient, testNamespace, vmcpLabels, timeout, pollingInterval) + By("Getting NodePort for VirtualMCPServer") Eventually(func() error { service := &corev1.Service{} diff --git a/test/e2e/thv-operator/virtualmcp/virtualmcp_lifecycle_test.go b/test/e2e/thv-operator/virtualmcp/virtualmcp_lifecycle_test.go index a1d38b496c..03493cc6e8 100644 --- a/test/e2e/thv-operator/virtualmcp/virtualmcp_lifecycle_test.go +++ b/test/e2e/thv-operator/virtualmcp/virtualmcp_lifecycle_test.go @@ -2,7 +2,9 @@ package virtualmcp import ( "context" + "encoding/json" "fmt" + "io" "net/http" "strings" "time" @@ -11,8 +13,10 @@ import ( "github.com/mark3labs/mcp-go/mcp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1" "github.com/stacklok/toolhive/test/e2e/images" @@ -583,3 +587,373 @@ var _ = Describe("VirtualMCPServer Lifecycle - DynamicRegistry", Ordered, Pendin }) }) }) + +// ReadinessResponse represents the /readyz endpoint response +type ReadinessResponse struct { + Status string `json:"status"` + Mode string `json:"mode"` + Reason string `json:"reason,omitempty"` +} + +// VirtualMCPServer K8s Manager Infrastructure Tests +// These tests verify the K8s manager integration that was implemented as part of THV-2884. +// Unlike the dynamic backend tests above (which are Pending until watcher is implemented), +// these tests verify the infrastructure is in place: manager creation, readiness probes, +// and endpoint behavior. +var _ = Describe("VirtualMCPServer K8s Manager Infrastructure", Ordered, func() { + var ( + testNamespace = "default" + mcpGroupName = "test-k8s-manager-infra-group" + vmcpServerName = "test-vmcp-k8s-manager-infra" + backendName = "backend-k8s-manager-infra-fetch" + timeout = 3 * time.Minute + pollingInterval = 2 * time.Second + vmcpNodePort int32 + ) + + BeforeAll(func() { + By("Creating MCPGroup for K8s manager infrastructure tests") + CreateMCPGroupAndWait(ctx, k8sClient, mcpGroupName, testNamespace, + "Test MCP Group for K8s manager infrastructure E2E tests", timeout, pollingInterval) + + By("Creating backend MCPServer") + backend := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backendName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.MCPServerSpec{ + GroupRef: mcpGroupName, + Image: images.GofetchServerImage, + Transport: "streamable-http", + ProxyPort: 8080, + McpPort: 8080, + }, + } + Expect(k8sClient.Create(ctx, backend)).To(Succeed()) + + By("Waiting for backend MCPServer to be ready") + Eventually(func() error { + server := &mcpv1alpha1.MCPServer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: backendName, + Namespace: testNamespace, + }, server) + if err != nil { + return fmt.Errorf("failed to get server: %w", err) + } + + if server.Status.Phase == mcpv1alpha1.MCPServerPhaseRunning { + return nil + } + return fmt.Errorf("backend not ready yet, phase: %s", server.Status.Phase) + }, timeout, pollingInterval).Should(Succeed(), "Backend should be ready") + + By("Creating VirtualMCPServer with discovered auth source (dynamic mode)") + vmcpServer := &mcpv1alpha1.VirtualMCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: vmcpServerName, + Namespace: testNamespace, + }, + Spec: mcpv1alpha1.VirtualMCPServerSpec{ + GroupRef: mcpv1alpha1.GroupRef{ + Name: mcpGroupName, + }, + IncomingAuth: &mcpv1alpha1.IncomingAuthConfig{ + Type: "anonymous", + }, + OutgoingAuth: &mcpv1alpha1.OutgoingAuthConfig{ + Source: "discovered", // This triggers K8s manager creation + }, + Aggregation: &mcpv1alpha1.AggregationConfig{ + ConflictResolution: "prefix", + }, + ServiceType: "NodePort", + }, + } + Expect(k8sClient.Create(ctx, vmcpServer)).To(Succeed()) + + By("Waiting for VirtualMCPServer to be ready") + WaitForVirtualMCPServerReady(ctx, k8sClient, vmcpServerName, testNamespace, timeout, pollingInterval) + + By("Getting NodePort for VirtualMCPServer") + vmcpNodePort = GetVMCPNodePort(ctx, k8sClient, vmcpServerName, testNamespace, timeout, pollingInterval) + + By(fmt.Sprintf("VirtualMCPServer is ready on NodePort: %d", vmcpNodePort)) + + By("Waiting for VirtualMCPServer to be accessible") + Eventually(func() error { + httpClient := &http.Client{Timeout: 5 * time.Second} + url := fmt.Sprintf("http://localhost:%d/health", vmcpNodePort) + resp, err := httpClient.Get(url) + if err != nil { + return fmt.Errorf("health check failed: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + return nil + }, 30*time.Second, 2*time.Second).Should(Succeed(), "VirtualMCPServer health endpoint should be accessible") + }) + + AfterAll(func() { + By("Cleaning up VirtualMCPServer") + vmcpServer := &mcpv1alpha1.VirtualMCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: vmcpServerName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, vmcpServer) + + By("Cleaning up backend MCPServer") + backend := &mcpv1alpha1.MCPServer{ + ObjectMeta: metav1.ObjectMeta{ + Name: backendName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, backend) + + By("Cleaning up MCPGroup") + group := &mcpv1alpha1.MCPGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: mcpGroupName, + Namespace: testNamespace, + }, + } + _ = k8sClient.Delete(ctx, group) + }) + + Context("Readiness Probe Integration", func() { + It("should expose /readyz endpoint", func() { + vmcpURL := fmt.Sprintf("http://localhost:%d", vmcpNodePort) + + By("Checking /readyz endpoint is accessible") + Eventually(func() error { + resp, err := http.Get(vmcpURL + "/readyz") + if err != nil { + return fmt.Errorf("failed to connect to /readyz: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body)) + } + + return nil + }, 2*time.Minute, 5*time.Second).Should(Succeed(), "/readyz should return 200 OK") + }) + + It("should return dynamic mode status", func() { + vmcpURL := fmt.Sprintf("http://localhost:%d", vmcpNodePort) + + By("Getting /readyz response") + resp, err := http.Get(vmcpURL + "/readyz") + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + By("Parsing readiness response") + var readiness ReadinessResponse + err = json.NewDecoder(resp.Body).Decode(&readiness) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying dynamic mode is enabled") + Expect(readiness.Status).To(Equal("ready"), "Status should be ready") + Expect(readiness.Mode).To(Equal("dynamic"), "Mode should be dynamic since outgoingAuth.source is 'discovered'") + }) + + It("should indicate cache sync in dynamic mode", func() { + vmcpURL := fmt.Sprintf("http://localhost:%d", vmcpNodePort) + + By("Verifying cache is synced") + resp, err := http.Get(vmcpURL + "/readyz") + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + var readiness ReadinessResponse + err = json.NewDecoder(resp.Body).Decode(&readiness) + Expect(err).NotTo(HaveOccurred()) + + // In dynamic mode with synced cache, status should be "ready" + Expect(readiness.Status).To(Equal("ready")) + Expect(readiness.Mode).To(Equal("dynamic")) + // Reason should be empty when ready + Expect(readiness.Reason).To(BeEmpty()) + }) + }) + + Context("K8s Manager Lifecycle", func() { + It("should start with K8s manager running", func() { + By("Verifying pod is running") + Eventually(func() error { + pods := &corev1.PodList{} + err := k8sClient.List(ctx, pods, + ctrlclient.InNamespace(testNamespace), + ctrlclient.MatchingLabels{"app.kubernetes.io/instance": vmcpServerName}) + if err != nil { + return fmt.Errorf("failed to list pods: %w", err) + } + + if len(pods.Items) == 0 { + return fmt.Errorf("no pods found") + } + + pod := pods.Items[0] + if pod.Status.Phase != corev1.PodRunning { + return fmt.Errorf("pod not running yet, phase: %s", pod.Status.Phase) + } + + // Check pod is ready + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady { + if condition.Status != corev1.ConditionTrue { + return fmt.Errorf("pod not ready: %s", condition.Message) + } + return nil + } + } + + return fmt.Errorf("pod ready condition not found") + }, timeout, pollingInterval).Should(Succeed(), "Pod should be running and ready") + }) + + It("should have healthy container status", func() { + By("Getting pod name") + pods := &corev1.PodList{} + err := k8sClient.List(ctx, pods, + ctrlclient.InNamespace(testNamespace), + ctrlclient.MatchingLabels{"app.kubernetes.io/instance": vmcpServerName}) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).NotTo(BeEmpty(), "Should have at least one pod") + + podName := pods.Items[0].Name + + By("Checking container status") + Eventually(func() error { + pod := &corev1.Pod{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: podName, + Namespace: testNamespace, + }, pod) + if err != nil { + return err + } + + // Check all containers are ready + for _, status := range pod.Status.ContainerStatuses { + if !status.Ready { + return fmt.Errorf("container %s not ready", status.Name) + } + } + + return nil + }, timeout, pollingInterval).Should(Succeed(), "All containers should be ready") + }) + }) + + Context("Health Endpoints", func() { + It("should expose /health endpoint that always returns 200", func() { + vmcpURL := fmt.Sprintf("http://localhost:%d", vmcpNodePort) + + By("Checking /health endpoint") + resp, err := http.Get(vmcpURL + "/health") + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + var health map[string]string + err = json.NewDecoder(resp.Body).Decode(&health) + Expect(err).NotTo(HaveOccurred()) + Expect(health["status"]).To(Equal("ok")) + }) + + It("should distinguish between /health and /readyz", func() { + vmcpURL := fmt.Sprintf("http://localhost:%d", vmcpNodePort) + + By("Getting /health response") + healthResp, err := http.Get(vmcpURL + "/health") + Expect(err).NotTo(HaveOccurred()) + defer healthResp.Body.Close() + + By("Getting /readyz response") + readyResp, err := http.Get(vmcpURL + "/readyz") + Expect(err).NotTo(HaveOccurred()) + defer readyResp.Body.Close() + + // Both should return 200 when ready + Expect(healthResp.StatusCode).To(Equal(http.StatusOK)) + Expect(readyResp.StatusCode).To(Equal(http.StatusOK)) + + // Parse both responses + var health map[string]string + err = json.NewDecoder(healthResp.Body).Decode(&health) + Expect(err).NotTo(HaveOccurred()) + + var readiness ReadinessResponse + err = json.NewDecoder(readyResp.Body).Decode(&readiness) + Expect(err).NotTo(HaveOccurred()) + + // Health is simple status + Expect(health).To(HaveKey("status")) + Expect(health).NotTo(HaveKey("mode")) + + // Readiness includes mode information + Expect(readiness.Status).To(Equal("ready")) + Expect(readiness.Mode).To(Equal("dynamic")) + }) + }) + + Context("Status Endpoint", func() { + It("should expose /status endpoint with group reference", func() { + vmcpURL := fmt.Sprintf("http://localhost:%d", vmcpNodePort) + + By("Checking /status endpoint") + resp, err := http.Get(vmcpURL + "/status") + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + + var status map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&status) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying group_ref is present") + Expect(status).To(HaveKey("group_ref")) + groupRef, ok := status["group_ref"].(string) + Expect(ok).To(BeTrue()) + Expect(groupRef).To(ContainSubstring(mcpGroupName)) + }) + + It("should list discovered backends", func() { + vmcpURL := fmt.Sprintf("http://localhost:%d", vmcpNodePort) + + By("Getting /status response") + resp, err := http.Get(vmcpURL + "/status") + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + + var status map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&status) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying backends are listed") + Expect(status).To(HaveKey("backends")) + backends, ok := status["backends"].([]interface{}) + Expect(ok).To(BeTrue()) + Expect(backends).NotTo(BeEmpty(), "Should have at least one backend") + + // Verify backend structure + backend := backends[0].(map[string]interface{}) + Expect(backend).To(HaveKey("name")) + Expect(backend).To(HaveKey("health")) + Expect(backend).To(HaveKey("transport")) + }) + }) +})