diff --git a/cmd/bbr/main.go b/cmd/bbr/main.go index 84b1fffac..7fb94408c 100644 --- a/cmd/bbr/main.go +++ b/cmd/bbr/main.go @@ -18,26 +18,22 @@ package main import ( "flag" - "net" - "net/http" "os" - "strconv" "github.com/go-logr/logr" - "github.com/prometheus/client_golang/prometheus/promhttp" uberzap "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" healthPb "google.golang.org/grpc/health/grpc_health_v1" - "k8s.io/client-go/rest" - "k8s.io/component-base/metrics/legacyregistry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" + "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -50,9 +46,8 @@ var ( "grpcHealthPort", 9005, "The port used for gRPC liveness and readiness probes") - metricsPort = flag.Int( - "metricsPort", 9090, "The metrics port") - streaming = flag.Bool( + metricsAddr = flag.String("metrics-bind-address", ":9090", "The address the metric endpoint binds to.") + streaming = flag.Bool( "streaming", false, "Enables streaming support for Envoy full-duplex streaming mode") logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity") @@ -85,7 +80,18 @@ func run() error { return err } - mgr, err := ctrl.NewManager(cfg, ctrl.Options{}) + metrics.Register() + + // Register metrics handler. + // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. + // More info: + // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server + // - https://book.kubebuilder.io/reference/metrics.html + metricsServerOptions := metricsserver.Options{ + BindAddress: *metricsAddr, + FilterProvider: filters.WithAuthenticationAndAuthorization, + } + mgr, err := ctrl.NewManager(cfg, ctrl.Options{Metrics: metricsServerOptions}) if err != nil { setupLog.Error(err, "Failed to create manager", "config", cfg) return err @@ -107,11 +113,6 @@ func run() error { return err } - // Register metrics handler. - if err := registerMetricsHandler(mgr, *metricsPort, cfg); err != nil { - return err - } - // Start the manager. This blocks until a signal is received. setupLog.Info("Manager starting") if err := mgr.Start(ctx); err != nil { @@ -152,58 +153,3 @@ func initLogging(opts *zap.Options) { logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller())) ctrl.SetLogger(logger) } - -const metricsEndpoint = "/metrics" - -// registerMetricsHandler adds the metrics HTTP handler as a Runnable to the given manager. -func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config) error { - metrics.Register() - - // Init HTTP server. - h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg) - if err != nil { - return err - } - - mux := http.NewServeMux() - mux.Handle(metricsEndpoint, h) - - srv := &http.Server{ - Addr: net.JoinHostPort("", strconv.Itoa(port)), - Handler: mux, - } - - if err := mgr.Add(&manager.Server{ - Name: "metrics", - Server: srv, - }); err != nil { - setupLog.Error(err, "Failed to register metrics HTTP handler") - return err - } - return nil -} - -func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) { - h := promhttp.HandlerFor( - legacyregistry.DefaultGatherer, - promhttp.HandlerOpts{}, - ) - httpClient, err := rest.HTTPClientFor(cfg) - if err != nil { - setupLog.Error(err, "Failed to create http client for metrics auth") - return nil, err - } - - filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient) - if err != nil { - setupLog.Error(err, "Failed to create metrics filter for auth") - return nil, err - } - metricsLogger := ctrl.Log.WithName("metrics").WithValues("path", metricsEndpoint) - metricsAuthHandler, err := filter(metricsLogger, h) - if err != nil { - setupLog.Error(err, "Failed to create metrics auth handler") - return nil, err - } - return metricsAuthHandler, nil -} diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 9fd401d4e..452d36c1d 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -19,38 +19,29 @@ package main import ( "flag" "fmt" - "net" - "net/http" "os" - "strconv" "github.com/go-logr/logr" - "github.com/prometheus/client_golang/prometheus/promhttp" uberzap "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" healthPb "google.golang.org/grpc/health/grpc_health_v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/rest" - "k8s.io/component-base/metrics/legacyregistry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -const ( - defaultMetricsEndpoint = "/metrics" -) - var ( grpcPort = flag.Int( "grpcPort", @@ -60,8 +51,7 @@ var ( "grpcHealthPort", 9003, "The port used for gRPC liveness and readiness probes") - metricsPort = flag.Int( - "metricsPort", 9090, "The metrics port") + metricsAddr = flag.String("metrics-bind-address", ":9090", "The address the metric endpoint binds to.") destinationEndpointHintKey = flag.String( "destinationEndpointHintKey", runserver.DefaultDestinationEndpointHintKey, @@ -143,11 +133,22 @@ func run() error { return err } + metrics.Register() + // Register metrics handler. + // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. + // More info: + // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server + // - https://book.kubebuilder.io/reference/metrics.html + metricsServerOptions := metricsserver.Options{ + BindAddress: *metricsAddr, + FilterProvider: filters.WithAuthenticationAndAuthorization, + } + poolNamespacedName := types.NamespacedName{ Name: *poolName, Namespace: *poolNamespace, } - mgr, err := runserver.NewDefaultManager(poolNamespacedName, cfg) + mgr, err := runserver.NewDefaultManager(poolNamespacedName, cfg, metricsServerOptions) if err != nil { setupLog.Error(err, "Failed to create controller manager") return err @@ -199,11 +200,6 @@ func run() error { return err } - // Register metrics handler. - if err := registerMetricsHandler(mgr, *metricsPort, cfg, datastore); err != nil { - return err - } - // Start the manager. This blocks until a signal is received. setupLog.Info("Controller manager starting") if err := mgr.Start(ctx); err != nil { @@ -247,62 +243,6 @@ func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore. return nil } -// registerMetricsHandler adds the metrics HTTP handler as a Runnable to the given manager. -func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config, ds datastore.Datastore) error { - metrics.Register() - legacyregistry.CustomMustRegister(collectors.NewInferencePoolMetricsCollector(ds)) - - metrics.RecordInferenceExtensionInfo() - - // Init HTTP server. - h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg) - if err != nil { - return err - } - - mux := http.NewServeMux() - mux.Handle(defaultMetricsEndpoint, h) - - srv := &http.Server{ - Addr: net.JoinHostPort("", strconv.Itoa(port)), - Handler: mux, - } - - if err := mgr.Add(&manager.Server{ - Name: "metrics", - Server: srv, - }); err != nil { - setupLog.Error(err, "Failed to register metrics HTTP handler") - return err - } - return nil -} - -func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) { - h := promhttp.HandlerFor( - legacyregistry.DefaultGatherer, - promhttp.HandlerOpts{}, - ) - httpClient, err := rest.HTTPClientFor(cfg) - if err != nil { - setupLog.Error(err, "Failed to create http client for metrics auth") - return nil, err - } - - filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient) - if err != nil { - setupLog.Error(err, "Failed to create metrics filter for auth") - return nil, err - } - metricsLogger := ctrl.Log.WithName("metrics").WithValues("path", defaultMetricsEndpoint) - metricsAuthHandler, err := filter(metricsLogger, h) - if err != nil { - setupLog.Error(err, "Failed to create metrics auth handler") - return nil, err - } - return metricsAuthHandler, nil -} - func validateFlags() error { if *poolName == "" { return fmt.Errorf("required %q flag not set", "poolName") diff --git a/pkg/bbr/handlers/request_test.go b/pkg/bbr/handlers/request_test.go index 55c42a218..3bc0d6fe4 100644 --- a/pkg/bbr/handlers/request_test.go +++ b/pkg/bbr/handlers/request_test.go @@ -26,8 +26,8 @@ import ( extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/testing/protocmp" - "k8s.io/component-base/metrics/legacyregistry" metricsutils "k8s.io/component-base/metrics/testutil" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -204,7 +204,7 @@ func TestHandleRequestBody(t *testing.T) { bbr_success_total{} 1 ` - if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(wantMetrics), "inference_model_request_total"); err != nil { + if err := metricsutils.GatherAndCompare(crmetrics.Registry, strings.NewReader(wantMetrics), "inference_model_request_total"); err != nil { t.Error(err) } } diff --git a/pkg/bbr/metrics/metrics.go b/pkg/bbr/metrics/metrics.go index fc3538fba..88b2c1ca0 100644 --- a/pkg/bbr/metrics/metrics.go +++ b/pkg/bbr/metrics/metrics.go @@ -17,51 +17,49 @@ limitations under the License. package metrics import ( + "fmt" "sync" + "github.com/prometheus/client_golang/prometheus" compbasemetrics "k8s.io/component-base/metrics" - "k8s.io/component-base/metrics/legacyregistry" + "sigs.k8s.io/controller-runtime/pkg/metrics" ) const component = "bbr" var ( - successCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ - Subsystem: component, - Name: "success_total", - Help: "Count of successes pulling model name from body and injecting it in the request headers.", - StabilityLevel: compbasemetrics.ALPHA, + successCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: component, + Name: "success_total", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Count of successes pulling model name from body and injecting it in the request headers."), }, []string{}, ) - modelNotInBodyCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ - Subsystem: component, - Name: "model_not_in_body_total", - Help: "Count of times the model was not present in the request body.", - StabilityLevel: compbasemetrics.ALPHA, + modelNotInBodyCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: component, + Name: "model_not_in_body_total", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Count of times the model was not present in the request body."), }, []string{}, ) - modelNotParsedCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ - Subsystem: component, - Name: "model_not_parsed_total", - Help: "Count of times the model was in the request body but we could not parse it.", - StabilityLevel: compbasemetrics.ALPHA, + modelNotParsedCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: component, + Name: "model_not_parsed_total", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Count of times the model was in the request body but we could not parse it."), }, []string{}, ) // TODO: Uncomment and use this metrics once the core server implementation has handling to skip body parsing if header exists. /* - modelAlreadyPresentInHeaderCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ + modelAlreadyPresentInHeaderCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ Subsystem: component, Name: "model_already_present_in_header_total", Help: "Count of times the model was already present in request headers.", - StabilityLevel: compbasemetrics.ALPHA, }, []string{}, ) @@ -73,10 +71,10 @@ var registerMetrics sync.Once // Register all metrics. func Register() { registerMetrics.Do(func() { - legacyregistry.MustRegister(successCounter) - legacyregistry.MustRegister(modelNotInBodyCounter) - legacyregistry.MustRegister(modelNotParsedCounter) - // legacyregistry.MustRegister(modelAlreadyPresentInHeaderCounter) + metrics.Registry.MustRegister(successCounter) + metrics.Registry.MustRegister(modelNotInBodyCounter) + metrics.Registry.MustRegister(modelNotParsedCounter) + // metrics.Registry.MustRegister(modelAlreadyPresentInHeaderCounter) }) } diff --git a/pkg/epp/metrics/metrics.go b/pkg/epp/metrics/metrics.go index 6cc0cdb83..64a20cda3 100644 --- a/pkg/epp/metrics/metrics.go +++ b/pkg/epp/metrics/metrics.go @@ -18,12 +18,15 @@ package metrics import ( "context" + "fmt" "sync" "time" + "github.com/prometheus/client_golang/prometheus" compbasemetrics "k8s.io/component-base/metrics" - "k8s.io/component-base/metrics/legacyregistry" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/metrics" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -40,213 +43,216 @@ var ( var ( // Inference Model Metrics - requestCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ - Subsystem: InferenceModelComponent, - Name: "request_total", - Help: "Counter of inference model requests broken out for each model and target model.", - StabilityLevel: compbasemetrics.ALPHA, + requestCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: InferenceModelComponent, + Name: "request_total", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Counter of inference model requests broken out for each model and target model."), }, []string{"model_name", "target_model_name"}, ) - requestErrCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ - Subsystem: InferenceModelComponent, - Name: "request_error_total", - Help: "Counter of inference model requests errors broken out for each model and target model.", - StabilityLevel: compbasemetrics.ALPHA, + requestErrCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: InferenceModelComponent, + Name: "request_error_total", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Counter of inference model requests errors broken out for each model and target model."), }, []string{"model_name", "target_model_name", "error_code"}, ) - requestLatencies = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + requestLatencies = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "request_duration_seconds", - Help: "Inference model response latency distribution in seconds for each model and target model.", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Inference model response latency distribution in seconds for each model and target model."), Buckets: []float64{ 0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3, 4, 5, 6, 8, 10, 15, 20, 30, 45, 60, 120, 180, 240, 300, 360, 480, 600, 900, 1200, 1800, 2700, 3600, }, - StabilityLevel: compbasemetrics.ALPHA, }, []string{"model_name", "target_model_name"}, ) - requestSizes = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + requestSizes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "request_sizes", - Help: "Inference model requests size distribution in bytes for each model and target model.", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Inference model requests size distribution in bytes for each model and target model."), // Use buckets ranging from 1000 bytes (1KB) to 10^9 bytes (1GB). Buckets: []float64{ 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, // More fine-grained up to 64KB 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608, // Exponential up to 8MB 16777216, 33554432, 67108864, 134217728, 268435456, 536870912, 1073741824, // Exponential up to 1GB }, - StabilityLevel: compbasemetrics.ALPHA, }, []string{"model_name", "target_model_name"}, ) - responseSizes = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + responseSizes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "response_sizes", - Help: "Inference model responses size distribution in bytes for each model and target model.", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Inference model responses size distribution in bytes for each model and target model."), // Most models have a response token < 8192 tokens. Each token, in average, has 4 characters. // 8192 * 4 = 32768. - Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536}, - StabilityLevel: compbasemetrics.ALPHA, + Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536}, }, []string{"model_name", "target_model_name"}, ) - inputTokens = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + inputTokens = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "input_tokens", - Help: "Inference model input token count distribution for requests in each model.", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Inference model input token count distribution for requests in each model."), // Most models have a input context window less than 1 million tokens. - Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536, 131072, 262144, 524288, 1048576}, - StabilityLevel: compbasemetrics.ALPHA, + Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536, 131072, 262144, 524288, 1048576}, }, []string{"model_name", "target_model_name"}, ) - outputTokens = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + outputTokens = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "output_tokens", - Help: "Inference model output token count distribution for requests in each model.", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Inference model output token count distribution for requests in each model."), // Most models generates output less than 8192 tokens. - Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192}, - StabilityLevel: compbasemetrics.ALPHA, + Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192}, }, []string{"model_name", "target_model_name"}, ) - runningRequests = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferenceModelComponent, - Name: "running_requests", - Help: "Inference model number of running requests in each model.", - StabilityLevel: compbasemetrics.ALPHA, + runningRequests = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferenceModelComponent, + Name: "running_requests", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Inference model number of running requests in each model."), }, []string{"model_name"}, ) // NTPOT - Normalized Time Per Output Token - NormalizedTimePerOutputToken = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + NormalizedTimePerOutputToken = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "normalized_time_per_output_token_seconds", - Help: "Inference model latency divided by number of output tokens in seconds for each model and target model.", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Inference model latency divided by number of output tokens in seconds for each model and target model."), // From few milliseconds per token to multiple seconds per token Buckets: []float64{ 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, }, - StabilityLevel: compbasemetrics.ALPHA, }, []string{"model_name", "target_model_name"}, ) // Inference Pool Metrics - inferencePoolAvgKVCache = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferencePoolComponent, - Name: "average_kv_cache_utilization", - Help: "The average kv cache utilization for an inference server pool.", - StabilityLevel: compbasemetrics.ALPHA, + inferencePoolAvgKVCache = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferencePoolComponent, + Name: "average_kv_cache_utilization", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "The average kv cache utilization for an inference server pool."), }, []string{"name"}, ) - inferencePoolAvgQueueSize = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferencePoolComponent, - Name: "average_queue_size", - Help: "The average number of requests pending in the model server queue.", - StabilityLevel: compbasemetrics.ALPHA, + inferencePoolAvgQueueSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferencePoolComponent, + Name: "average_queue_size", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "The average number of requests pending in the model server queue."), }, []string{"name"}, ) - inferencePoolReadyPods = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferencePoolComponent, - Name: "ready_pods", - Help: "The number of ready pods in the inference server pool.", - StabilityLevel: compbasemetrics.ALPHA, + inferencePoolReadyPods = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferencePoolComponent, + Name: "ready_pods", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "The number of ready pods in the inference server pool."), }, []string{"name"}, ) // Scheduler Metrics - SchedulerE2ELatency = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + SchedulerE2ELatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceExtension, Name: "scheduler_e2e_duration_seconds", - Help: "End-to-end scheduling latency distribution in seconds.", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "End-to-end scheduling latency distribution in seconds."), Buckets: []float64{ 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, }, - StabilityLevel: compbasemetrics.ALPHA, }, []string{}, ) - SchedulerPluginProcessingLatencies = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + SchedulerPluginProcessingLatencies = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceExtension, Name: "scheduler_plugin_duration_seconds", - Help: "Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name.", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name."), Buckets: []float64{ 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, }, - StabilityLevel: compbasemetrics.ALPHA, }, []string{"plugin_type", "plugin_name"}, ) // Info Metrics - InferenceExtensionInfo = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferenceExtension, - Name: "info", - Help: "General information of the current build of Inference Extension.", - StabilityLevel: compbasemetrics.ALPHA, + InferenceExtensionInfo = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferenceExtension, + Name: "info", + Help: fmt.Sprintf("[%v] %v", compbasemetrics.ALPHA, "General information of the current build of Inference Extension."), }, []string{"commit"}, ) ) - var registerMetrics sync.Once // Register all metrics. func Register() { registerMetrics.Do(func() { - legacyregistry.MustRegister(requestCounter) - legacyregistry.MustRegister(requestErrCounter) - legacyregistry.MustRegister(requestLatencies) - legacyregistry.MustRegister(requestSizes) - legacyregistry.MustRegister(responseSizes) - legacyregistry.MustRegister(inputTokens) - legacyregistry.MustRegister(outputTokens) - legacyregistry.MustRegister(runningRequests) - legacyregistry.MustRegister(NormalizedTimePerOutputToken) - - legacyregistry.MustRegister(inferencePoolAvgKVCache) - legacyregistry.MustRegister(inferencePoolAvgQueueSize) - legacyregistry.MustRegister(inferencePoolReadyPods) - - legacyregistry.MustRegister(SchedulerPluginProcessingLatencies) - legacyregistry.MustRegister(SchedulerE2ELatency) - - legacyregistry.MustRegister(InferenceExtensionInfo) + metrics.Registry.MustRegister(requestCounter) + metrics.Registry.MustRegister(requestErrCounter) + metrics.Registry.MustRegister(requestLatencies) + metrics.Registry.MustRegister(requestSizes) + metrics.Registry.MustRegister(responseSizes) + metrics.Registry.MustRegister(inputTokens) + metrics.Registry.MustRegister(outputTokens) + metrics.Registry.MustRegister(runningRequests) + metrics.Registry.MustRegister(NormalizedTimePerOutputToken) + + metrics.Registry.MustRegister(inferencePoolAvgKVCache) + metrics.Registry.MustRegister(inferencePoolAvgQueueSize) + metrics.Registry.MustRegister(inferencePoolReadyPods) + + metrics.Registry.MustRegister(SchedulerPluginProcessingLatencies) + metrics.Registry.MustRegister(SchedulerE2ELatency) + + metrics.Registry.MustRegister(InferenceExtensionInfo) }) } +// Just for integration test +func Reset() { + requestCounter.Reset() + requestErrCounter.Reset() + requestLatencies.Reset() + requestSizes.Reset() + responseSizes.Reset() + inputTokens.Reset() + outputTokens.Reset() + runningRequests.Reset() + NormalizedTimePerOutputToken.Reset() + inferencePoolAvgKVCache.Reset() + inferencePoolAvgQueueSize.Reset() + inferencePoolReadyPods.Reset() + SchedulerPluginProcessingLatencies.Reset() + SchedulerE2ELatency.Reset() + InferenceExtensionInfo.Reset() +} + // RecordRequstCounter records the number of requests. func RecordRequestCounter(modelName, targetModelName string) { requestCounter.WithLabelValues(modelName, targetModelName).Inc() diff --git a/pkg/epp/metrics/metrics_test.go b/pkg/epp/metrics/metrics_test.go index 3a8136a08..2366b6a94 100644 --- a/pkg/epp/metrics/metrics_test.go +++ b/pkg/epp/metrics/metrics_test.go @@ -22,8 +22,8 @@ import ( "testing" "time" - "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" + "sigs.k8s.io/controller-runtime/pkg/metrics" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -93,7 +93,7 @@ func TestRecordRequestCounterandSizes(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestTotal, RequestTotalMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantRequestTotal, RequestTotalMetric); err != nil { t.Error(err) } wantRequestSizes, err := os.Open("testdata/request_sizes_metric") @@ -105,7 +105,7 @@ func TestRecordRequestCounterandSizes(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestSizes, RequestSizesMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantRequestSizes, RequestSizesMetric); err != nil { t.Error(err) } }) @@ -165,7 +165,7 @@ func TestRecordRequestErrorCounter(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestErrorCounter, RequestErrorTotalMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantRequestErrorCounter, RequestErrorTotalMetric); err != nil { t.Error(err) } }) @@ -247,7 +247,7 @@ func TestRecordRequestLatencies(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestLatencies, RequestLatenciesMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantRequestLatencies, RequestLatenciesMetric); err != nil { t.Error(err) } }) @@ -348,7 +348,7 @@ func TestRecordNormalizedTimePerOutputToken(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantLatencyPerToken, NormalizedTimePerOutputTokenMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantLatencyPerToken, NormalizedTimePerOutputTokenMetric); err != nil { t.Error(err) } }) @@ -416,7 +416,7 @@ func TestRecordResponseMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantResponseSize, ResponseSizesMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantResponseSize, ResponseSizesMetric); err != nil { t.Error(err) } @@ -429,7 +429,7 @@ func TestRecordResponseMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantInputToken, InputTokensMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantInputToken, InputTokensMetric); err != nil { t.Error(err) } @@ -442,7 +442,7 @@ func TestRecordResponseMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantOutputToken, OutputTokensMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantOutputToken, OutputTokensMetric); err != nil { t.Error(err) } }) @@ -502,7 +502,7 @@ func TestRunningRequestsMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRunningRequests, RunningRequestsMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantRunningRequests, RunningRequestsMetric); err != nil { t.Error(err) } }) @@ -538,7 +538,7 @@ func TestInferencePoolMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantKVCache, KVCacheAvgUsageMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantKVCache, KVCacheAvgUsageMetric); err != nil { t.Error(err) } @@ -551,7 +551,7 @@ func TestInferencePoolMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantQueueSize, QueueAvgSizeMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantQueueSize, QueueAvgSizeMetric); err != nil { t.Error(err) } }) @@ -615,7 +615,7 @@ func TestSchedulerPluginProcessingLatencies(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantPluginLatencies, "inference_extension_scheduler_plugin_duration_seconds"); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantPluginLatencies, "inference_extension_scheduler_plugin_duration_seconds"); err != nil { t.Error(err) } }) @@ -658,7 +658,7 @@ func TestSchedulerE2ELatency(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantE2ELatency, "inference_extension_scheduler_e2e_duration_seconds"); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantE2ELatency, "inference_extension_scheduler_e2e_duration_seconds"); err != nil { t.Error(err) } }) diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index e56682104..89e509696 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" ) @@ -41,7 +42,7 @@ func init() { } // defaultManagerOptions returns the default options used to create the manager. -func defaultManagerOptions(namespacedName types.NamespacedName) ctrl.Options { +func defaultManagerOptions(namespacedName types.NamespacedName, metricsServerOptions metricsserver.Options) ctrl.Options { return ctrl.Options{ Scheme: scheme, Cache: cache.Options{ @@ -67,12 +68,13 @@ func defaultManagerOptions(namespacedName types.NamespacedName) ctrl.Options { }, }, }, + Metrics: metricsServerOptions, } } // NewDefaultManager creates a new controller manager with default configuration. -func NewDefaultManager(namespacedName types.NamespacedName, restConfig *rest.Config) (ctrl.Manager, error) { - manager, err := ctrl.NewManager(restConfig, defaultManagerOptions(namespacedName)) +func NewDefaultManager(namespacedName types.NamespacedName, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) { + manager, err := ctrl.NewManager(restConfig, defaultManagerOptions(namespacedName, metricsServerOptions)) if err != nil { return nil, fmt.Errorf("failed to create controller manager: %v", err) } diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 68787da72..428157746 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -24,8 +24,6 @@ import ( "errors" "fmt" "io" - "net" - "net/http" "os" "path/filepath" "strconv" @@ -37,7 +35,6 @@ import ( extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/google/go-cmp/cmp" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -51,22 +48,21 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" k8syaml "k8s.io/apimachinery/pkg/util/yaml" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/component-base/metrics/legacyregistry" metricsutils "k8s.io/component-base/metrics/testutil" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/envtest" - "sigs.k8s.io/controller-runtime/pkg/manager" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" + "sigs.k8s.io/controller-runtime/pkg/metrics/filters" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" epptestutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" @@ -75,8 +71,7 @@ import ( ) const ( - port = runserver.DefaultGrpcPort - metricsPort = 8889 + port = runserver.DefaultGrpcPort ) var ( @@ -1216,13 +1211,12 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { if len(test.wantMetrics) != 0 { for metricName, value := range test.wantMetrics { - if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(value), metricName); err != nil { + if err := metricsutils.GatherAndCompare(crmetrics.Registry, strings.NewReader(value), metricName); err != nil { t.Error(err) } } } - - legacyregistry.Reset() + metrics.Reset() }) } } @@ -1338,15 +1332,21 @@ func BeforeSuite() func() { // Init runtime. ctrl.SetLogger(logger) - mgr, err := server.NewManagerWithOptions(cfg, managerTestOptions("default", "vllm-llama3-8b-instruct-pool")) + metrics.Register() + // Register metrics handler. + // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. + // More info: + // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server + // - https://book.kubebuilder.io/reference/metrics.html + metricsServerOptions := metricsserver.Options{ + BindAddress: ":8889", + FilterProvider: filters.WithAuthenticationAndAuthorization, + } + mgr, err := runserver.NewManagerWithOptions(cfg, managerTestOptions("default", "vllm-llama3-8b-instruct-pool", metricsServerOptions)) if err != nil { logutil.Fatal(logger, err, "Failed to create controller manager") } - if err := registerMetricsHandler(mgr, metricsPort); err != nil { - logutil.Fatal(logger, err, "Failed to register metrics handler") - } - serverRunner = runserver.NewDefaultExtProcServerRunner() serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond) @@ -1443,41 +1443,13 @@ func makeMetadata(endpoint string) *structpb.Struct { } } -// registerMetricsHandler is a simplified version of metrics endpoint handler -// without Authentication for integration tests. -func registerMetricsHandler(mgr manager.Manager, port int) error { - metrics.Register() - - // Init HTTP server. - h := promhttp.HandlerFor( - legacyregistry.DefaultGatherer, - promhttp.HandlerOpts{}, - ) - - mux := http.NewServeMux() - mux.Handle("/metrics", h) - - srv := &http.Server{ - Addr: net.JoinHostPort("", strconv.Itoa(port)), - Handler: mux, - } - - if err := mgr.Add(&manager.Server{ - Name: "metrics", - Server: srv, - }); err != nil { - return err - } - return nil -} - // inject options that allow multiple test runs to run // https://github.com/kubernetes-sigs/controller-runtime/issues/2937 -func managerTestOptions(namespace, name string) ctrl.Options { +func managerTestOptions(namespace, name string, metricsServerOptions metricsserver.Options) ctrl.Options { return ctrl.Options{ Scheme: scheme, Cache: cache.Options{ - ByObject: map[client.Object]cache.ByObject{ + ByObject: map[k8sclient.Object]cache.ByObject{ &corev1.Pod{}: { Namespaces: map[string]cache.Config{ namespace: {}, @@ -1502,6 +1474,7 @@ func managerTestOptions(namespace, name string) ctrl.Options { Controller: config.Controller{ SkipNameValidation: boolPointer(true), }, + Metrics: metricsServerOptions, } }