Skip to content

Commit c9d707c

Browse files
committed
add DeadlinePriority plugin in intra-flow dispatch policy
Signed-off-by: CYJiang <[email protected]>
1 parent 6ed9419 commit c9d707c

File tree

2 files changed

+118
-0
lines changed
  • pkg/epp/flowcontrol/framework

2 files changed

+118
-0
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package edf
18+
19+
import (
20+
"time"
21+
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
25+
)
26+
27+
// EDFPolicyName is the name of the Earliest Deadline First (EDF) intra-flow dispatch policy.
28+
//
29+
// This policy implements a deadline-urgency scheduling strategy by selecting the request with the earliest absolute
30+
// deadline, computed as `EnqueueTime() + EffectiveTTL()`. Requests without a valid TTL (i.e., EffectiveTTL <= 0) are
31+
// treated as having no deadline and are scheduled after all time-bound requests, using FCFS as a tie-breaker for fairness.
32+
const EDFPolicyName = "EDF"
33+
34+
func init() {
35+
dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(EDFPolicyName),
36+
func() (framework.IntraFlowDispatchPolicy, error) {
37+
return newEDFPolicy(), nil
38+
})
39+
}
40+
41+
// EDFPolicy implements an intra-flow dispatch policy based on the Earliest Deadline First (EDF) scheduling algorithm.
42+
// Requests with earlier absolute deadlines (EnqueueTime + EffectiveTTL) are dispatched first.
43+
// See the documentation for the exported `EDFPolicyName` constant for detailed behavioral guarantees.
44+
type EDFPolicy struct {
45+
comparator framework.ItemComparator
46+
}
47+
48+
var _ framework.IntraFlowDispatchPolicy = &EDFPolicy{}
49+
50+
func newEDFPolicy() framework.IntraFlowDispatchPolicy {
51+
return &EDFPolicy{
52+
comparator: &edfComparator{},
53+
}
54+
}
55+
56+
func (p *EDFPolicy) Name() string {
57+
return EDFPolicyName
58+
}
59+
60+
// RequiredQueueCapabilities returns the queue capabilities required by this policy.
61+
// It requires a priority-configurable queue (e.g., heap-based) to maintain items in deadline-sorted order.
62+
func (p *EDFPolicy) RequiredQueueCapabilities() []framework.QueueCapability {
63+
return []framework.QueueCapability{framework.CapabilityPriorityConfigurable}
64+
}
65+
66+
func (p *EDFPolicy) Comparator() framework.ItemComparator {
67+
return p.comparator
68+
}
69+
70+
// SelectItem selects the next item to dispatch by returning the head of the queue.
71+
// This assumes the underlying queue is ordered according to the policy's comparator
72+
// (enforced via RequiredQueueCapabilities). Thus, the most urgent request is always at the head.
73+
// Returns (nil, nil) if the queue is empty or nil.
74+
func (p *EDFPolicy) SelectItem(queue framework.FlowQueueAccessor) (selectedItem types.QueueItemAccessor, err error) {
75+
if queue == nil {
76+
return nil, nil
77+
}
78+
return queue.PeekHead(), nil
79+
}
80+
81+
var maxDeadlineTime = time.Unix(1<<63-60, 0)
82+
83+
// calculateDeadline computes the absolute deadline for a request.
84+
// The deadline is defined as the logical enqueue time plus the effective time-to-live (TTL).
85+
// If EffectiveTTL is zero or negative, the request is considered non-time-sensitive and assigned a
86+
// far-future deadline so it sorts after all SLO-bound requests.
87+
func calculateDeadline(item types.QueueItemAccessor) time.Time {
88+
ttl := item.EffectiveTTL()
89+
if ttl <= 0 {
90+
// No TTL: treat as "never expire", but still respect enqueue time for fairness.
91+
return maxDeadlineTime
92+
}
93+
return item.EnqueueTime().Add(ttl)
94+
}
95+
96+
type edfComparator struct{}
97+
98+
func (d *edfComparator) Func() framework.ItemComparatorFunc {
99+
return func(a, b types.QueueItemAccessor) bool {
100+
deadlineA := calculateDeadline(a)
101+
deadlineB := calculateDeadline(b)
102+
103+
if !deadlineA.Equal(deadlineB) {
104+
return deadlineA.Before(deadlineB) // earlier deadline = higher priority
105+
}
106+
107+
// Same deadline: FCFS (earlier enqueue time = higher priority)
108+
return a.EnqueueTime().Before(b.EnqueueTime())
109+
}
110+
}
111+
112+
// ScoreType indicates this policy uses EDF-based scoring.
113+
func (d *edfComparator) ScoreType() string {
114+
return string(framework.EDFPriorityScoreType)
115+
}

pkg/epp/flowcontrol/framework/policies.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ const (
2525
// EnqueueTimePriorityScoreType indicates that the priority is based on the item's enqueue time, with earlier times
2626
// being higher priority.
2727
EnqueueTimePriorityScoreType PriorityScoreType = "enqueue_time_ns_asc"
28+
29+
// EDFPriorityScoreType indicates priority scoring based on the Earliest Deadline First (EDF) scheduling policy.
30+
EDFPriorityScoreType PriorityScoreType = "earliest_deadline_first"
2831
)
2932

3033
// ItemComparatorFunc defines the function signature for comparing two `types.QueueItemAccessor` instances to determine

0 commit comments

Comments
 (0)