Skip to content

Add prefix cache aware scheduling #768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/component-base/metrics/legacyregistry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
Expand All @@ -42,7 +43,9 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

Expand Down Expand Up @@ -106,8 +109,31 @@ var (
"Prometheus metric for the LoRA info metrics (must be in vLLM label format).")

setupLog = ctrl.Log.WithName("setup")

// Environment variables
schedulerV2 = envutil.GetEnvString("EXPERIMENTAL_USE_SCHEDULER_V2", "false", setupLog)
)

func loadPrefixCacheConfig() prefix.Config {
baseLogger := log.Log.WithName("env-config")

return prefix.Config{
HashBlockSize: envutil.GetEnvInt("PREFIX_CACHE_HASH_BLOCK_SIZE", prefix.DefaultHashBlockSize, baseLogger),
MaxPrefixBlocksToMatch: envutil.GetEnvInt("PREFIX_CACHE_MAX_PREFIX_BLOCKS", prefix.DefaultMaxPrefixBlocks, baseLogger),
LRUIndexerCapacity: envutil.GetEnvInt("PREFIX_CACHE_LRU_CAPACITY", prefix.DefaultLRUIndexerCapacity, baseLogger),
}
}

func loadSchedulingScorerWeights() scheduling.ScorerWeights {
baseLogger := log.Log.WithName("env-config")

return scheduling.ScorerWeights{
Prefix: envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", 3, baseLogger),
Queue: envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", 2, baseLogger),
KVCache: envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", 1, baseLogger),
Comment on lines +131 to +133
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend to comment on the defaults and how they were selected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also suggest to define the defaults in the config file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do these weight defined, test based or brain storm?

}
}

func main() {
if err := run(); err != nil {
os.Exit(1)
Expand Down Expand Up @@ -171,6 +197,11 @@ func run() error {
datastore := datastore.NewDatastore(ctx, pmf)

scheduler := scheduling.NewScheduler(datastore)
if schedulerV2 == "true" {
schedConfig := scheduling.CreateConfig(loadSchedulingScorerWeights(), loadPrefixCacheConfig())
setupLog.Info("Creating scheduler", "config", *schedConfig)
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedConfig)
}
serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
Expand Down
75 changes: 75 additions & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package metrics

import (
"context"
"runtime/debug"
"sync"
"time"

Expand Down Expand Up @@ -219,6 +220,40 @@ var (
},
[]string{"commit"},
)

// Prefix indexer Metrics
PrefixCacheSize = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Subsystem: InferenceExtension,
Name: "prefix_indexer_size",
Help: "Size of the prefix indexer.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{},
)

PrefixCacheHitRatio = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Subsystem: InferenceExtension,
Name: "prefix_indexer_hit_ratio",
Help: "Ratio of prefix length matched to total prefix length in the cache lookup.",
// Buckets from 0.0 to 1.0 in increments
Buckets: []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{},
)

PrefixCacheHitLength = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Subsystem: InferenceExtension,
Name: "prefix_indexer_hit_bytes",
Help: "Length of the prefix match in number of bytes in the cache lookup.",
Buckets: []float64{0, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{},
)
)

var registerMetrics sync.Once
Expand All @@ -244,6 +279,10 @@ func Register() {
legacyregistry.MustRegister(SchedulerE2ELatency)

legacyregistry.MustRegister(InferenceExtensionInfo)

legacyregistry.MustRegister(PrefixCacheSize)
legacyregistry.MustRegister(PrefixCacheHitRatio)
legacyregistry.MustRegister(PrefixCacheHitLength)
})
}

Expand Down Expand Up @@ -352,8 +391,44 @@ func RecordSchedulerE2ELatency(duration time.Duration) {
SchedulerE2ELatency.WithLabelValues().Observe(duration.Seconds())
}

// RecordPrefixCacheSize records the size of the prefix indexer in megabytes.
func RecordPrefixCacheSize(size int64) {
PrefixCacheSize.WithLabelValues().Set(float64(size))
}

// RecordPrefixCacheMatch records both the hit ratio and hit length for a prefix indexer match.
// matchedLength is the number of characters that matched, and totalLength is the total prefix length.
func RecordPrefixCacheMatch(matchedLength, totalLength int) {
// Record the hit length metric
PrefixCacheHitLength.WithLabelValues().Observe(float64(matchedLength))

// Record the hit ratio metric if totalLength is positive
if totalLength > 0 {
ratio := float64(matchedLength) / float64(totalLength)
PrefixCacheHitRatio.WithLabelValues().Observe(ratio)
}
}

func RecordInferenceExtensionInfo() {
if CommitSHA != "" {
InferenceExtensionInfo.WithLabelValues(CommitSHA).Set(1)
}
}

func init() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JeffLuoo did you send a PR to populate the commit sha? is this really needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why is it still here. It should have been deleted because the CommitSHA is no longer fetched from the vcs.revision in the latest implementation: https://github.com/kubernetes-sigs/gateway-api-inference-extension/blob/main/pkg/epp/metrics/metrics.go.

The original task is #579 which adds the "info" metrics for the inference gateway and start with version and hash for the build.

I personally think the version might be more useful in terms of determining the current build of EPP, SHA is useful if you are testing out a specific change on top of the main.

info, ok := debug.ReadBuildInfo()
if !ok {
return
}

var Commit = func(i *debug.BuildInfo) string {
for _, setting := range i.Settings {
if setting.Key == "vcs.revision" {
return setting.Value
}
}
return ""
}(info)

CommitSHA = Commit
}
103 changes: 103 additions & 0 deletions pkg/epp/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,3 +663,106 @@ func TestSchedulerE2ELatency(t *testing.T) {
})
}
}

func TestPrefixCacheMetrics(t *testing.T) {
const (
PrefixCacheSizeMetric = InferenceExtension + "_prefix_indexer_size"
PrefixCacheHitRatioMetric = InferenceExtension + "_prefix_indexer_hit_ratio"
PrefixCacheHitLengthMetric = InferenceExtension + "_prefix_indexer_hit_bytes"
)

type cacheMatchRecord struct {
matchedLength int
totalLength int
}

scenario := struct {
name string
cacheSizes []int64
cacheMatches []cacheMatchRecord
}{
name: "multiple cache metrics",
cacheSizes: []int64{1024, 2048, 4096},
cacheMatches: []cacheMatchRecord{
{
matchedLength: 5,
totalLength: 10,
},
{
matchedLength: 0,
totalLength: 10,
},
{
matchedLength: 10,
totalLength: 10,
},
{
matchedLength: 7,
totalLength: 10,
},
{
matchedLength: 64,
totalLength: 128,
},
{
matchedLength: 0,
totalLength: 128,
},
},
}

Register()
t.Run(scenario.name, func(t *testing.T) {
// Record cache size metrics
for _, size := range scenario.cacheSizes {
RecordPrefixCacheSize(size)
}

// Record cache match metrics (both hit ratio and hit length)
for _, match := range scenario.cacheMatches {
RecordPrefixCacheMatch(match.matchedLength, match.totalLength)
}

// Verify cache size metrics
wantCacheSizeMetrics, err := os.Open("testdata/prefix_indexer_size_metric")
defer func() {
if err := wantCacheSizeMetrics.Close(); err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantCacheSizeMetrics, PrefixCacheSizeMetric); err != nil {
t.Error(err)
}

// Verify hit ratio metrics
wantHitRatioMetrics, err := os.Open("testdata/prefix_indexer_hit_ratio_metric")
defer func() {
if err := wantHitRatioMetrics.Close(); err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantHitRatioMetrics, PrefixCacheHitRatioMetric); err != nil {
t.Error(err)
}

// Verify hit length metrics
wantHitLengthMetrics, err := os.Open("testdata/prefix_indexer_hit_bytes_metric")
defer func() {
if err := wantHitLengthMetrics.Close(); err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantHitLengthMetrics, PrefixCacheHitLengthMetric); err != nil {
t.Error(err)
}
})
}
19 changes: 19 additions & 0 deletions pkg/epp/metrics/testdata/prefix_indexer_hit_bytes_metric
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# HELP inference_extension_prefix_indexer_hit_bytes [ALPHA] Length of the prefix match in number of bytes in the cache lookup.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these be multiline string variables in the testfile(s)? Reduces the need for these files/dir

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior art:

wantMetrics: map[string]string{`inference_model_input_tokens`: `

# TYPE inference_extension_prefix_indexer_hit_bytes histogram
inference_extension_prefix_indexer_hit_bytes_bucket{le="0"} 2
inference_extension_prefix_indexer_hit_bytes_bucket{le="16"} 5
inference_extension_prefix_indexer_hit_bytes_bucket{le="32"} 5
inference_extension_prefix_indexer_hit_bytes_bucket{le="64"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="128"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="256"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="512"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="1024"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="2048"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="4096"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="8192"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="16384"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="32768"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="65536"} 6
inference_extension_prefix_indexer_hit_bytes_bucket{le="+Inf"} 6
inference_extension_prefix_indexer_hit_bytes_sum 86
inference_extension_prefix_indexer_hit_bytes_count 6
16 changes: 16 additions & 0 deletions pkg/epp/metrics/testdata/prefix_indexer_hit_ratio_metric
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# HELP inference_extension_prefix_indexer_hit_ratio [ALPHA] Ratio of prefix length matched to total prefix length in the cache lookup.
# TYPE inference_extension_prefix_indexer_hit_ratio histogram
inference_extension_prefix_indexer_hit_ratio_bucket{le="0"} 2
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.1"} 2
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.2"} 2
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.3"} 2
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.4"} 2
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.5"} 4
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.6"} 4
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.7"} 5
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.8"} 5
inference_extension_prefix_indexer_hit_ratio_bucket{le="0.9"} 5
inference_extension_prefix_indexer_hit_ratio_bucket{le="1"} 6
inference_extension_prefix_indexer_hit_ratio_bucket{le="+Inf"} 6
inference_extension_prefix_indexer_hit_ratio_sum 2.7
inference_extension_prefix_indexer_hit_ratio_count 6
3 changes: 3 additions & 0 deletions pkg/epp/metrics/testdata/prefix_indexer_size_metric
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# HELP inference_extension_prefix_indexer_size [ALPHA] Size of the prefix indexer.
# TYPE inference_extension_prefix_indexer_size gauge
inference_extension_prefix_indexer_size{} 4096
67 changes: 67 additions & 0 deletions pkg/epp/scheduling/config_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package scheduling implements request scheduling algorithms.
package scheduling

import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/prefix"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

func CreateConfig(weights ScorerWeights, prefixConfig prefix.Config) *SchedulerConfig {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the previous iteration of config restrict?

Creating a V2 as one of the initial adopters of this framework may set a bad precedent for reusability

prefixPlugin := prefix.New(prefixConfig)
queuePlugin := &scorer.QueueScorer{}
kvCachePlugin := &scorer.KVCacheScorer{}
config := &SchedulerConfig{
PreSchedulePlugins: []plugins.PreSchedule{prefixPlugin},
PostSchedulePlugins: []plugins.PostSchedule{prefixPlugin},
Scorers: map[plugins.Scorer]int{
prefixPlugin: weights.Prefix,
queuePlugin: weights.Queue,
kvCachePlugin: weights.KVCache,
},
Filters: []plugins.Filter{&sheddableRequestFilterV2{}},
Picker: &picker.MaxScorePicker{},
}
return config
}

type ScorerWeights struct {
Prefix int
Queue int
KVCache int
}
Comment on lines +47 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be defined with the extension itself?


type sheddableRequestFilterV2 struct{}

func (p *sheddableRequestFilterV2) Name() string {
return "sheddableRequestFilterV2"
}

func (p *sheddableRequestFilterV2) Filter(ctx *types.SchedulingContext, pods []types.Pod) []types.Pod {
if ctx.Req.Critical {
// Allow all pods to pass through if the request is critical, even if all pods reach their capacity.
return pods
}

// Only allow pods that have enough capacity to handle the request.
return filter.HasCapacityFilter.Filter(ctx, pods)
}
8 changes: 8 additions & 0 deletions pkg/epp/scheduling/plugins/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ var HasCapacityFilter = &baseFilter{
filter: toFilterFunc(queueThresholdPredicate(config.Conf.QueueThresholdCritical).and(kvCacheThresholdPredicate(config.Conf.KVCacheThreshold))),
}

// NoopFilter is a filter that does not filter out any pods.
var NoopFilter = &baseFilter{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we adding this back?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

name: "noop",
filter: toFilterFunc(func(req *types.LLMRequest, pod types.Pod) bool {
return true
}),
}

// podPredicate is a filter function to check whether a pod is desired.
type podPredicate func(req *types.LLMRequest, pod types.Pod) bool

Expand Down
Loading