Skip to content

Commit e7a78e4

Browse files
authored
feature: add slo-based deadline ordering policy (kubernetes-sigs#2531)
* feature: add slo-based deadline ordering policy * Move GetHeader function to utils package * Update plugin guide
1 parent fd0a926 commit e7a78e4

File tree

7 files changed

+340
-0
lines changed

7 files changed

+340
-0
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ func (r *Runner) registerInTreePlugins() {
460460
fwkplugin.Register(fairness.RoundRobinFairnessPolicyType, fairness.RoundRobinFairnessPolicyFactory)
461461
fwkplugin.Register(ordering.FCFSOrderingPolicyType, ordering.FCFSOrderingPolicyFactory)
462462
fwkplugin.Register(ordering.EDFOrderingPolicyType, ordering.EDFOrderingPolicyFactory)
463+
fwkplugin.Register(ordering.SLODeadlineOrderingPolicyType, ordering.SLODeadlineOrderingPolicyFactory)
463464
// Latency predictor plugins
464465
fwkplugin.Register(predictedlatency.PredictedLatencyPluginType, predictedlatency.PredictedLatencyFactory)
465466
// register filter for test purpose only (used in conformance tests)

pkg/common/request/headers.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,22 @@ limitations under the License.
1616

1717
package request
1818

19+
import "strings"
20+
1921
const (
2022
RequestIdHeaderKey = "x-request-id"
2123
)
24+
25+
// GetHeader returns the value for key from headers, with case-insensitive lookup.
26+
func GetHeader(headers map[string]string, key string) string {
27+
if v, ok := headers[key]; ok {
28+
return v
29+
}
30+
lower := strings.ToLower(key)
31+
for k, v := range headers {
32+
if strings.ToLower(k) == lower {
33+
return v
34+
}
35+
}
36+
return ""
37+
}

pkg/common/request/headers_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package request
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
)
24+
25+
func TestGetHeader(t *testing.T) {
26+
headers := map[string]string{"X-SLO-TTFT-MS": "42", "Other": "x"}
27+
assert.Equal(t, "42", GetHeader(headers, "X-SLO-TTFT-MS"))
28+
assert.Equal(t, "42", GetHeader(headers, "x-slo-ttft-ms"))
29+
assert.Equal(t, "", GetHeader(headers, "missing"))
30+
assert.Equal(t, "", GetHeader(nil, "k"))
31+
}

pkg/epp/flowcontrol/framework/plugins/ordering/doc.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,9 @@ limitations under the License.
5151
// - EDF ("Earliest Deadline First") ("edf-ordering-policy"): Orders requests by their absolute deadline
5252
// (EnqueueTime + TTL).
5353
// This maximizes the number of requests served before their deadlines expire.
54+
//
55+
// - SLO Deadline ("slo-deadline-ordering-policy"): Orders requests by an SLO-based (service level objective) deadline
56+
// computed as ReceivedTimestamp + x-slo-ttft-ms header (interpreted as milliseconds).
57+
// Requests without a valid header are scheduled after SLO-bound requests.
58+
// This maximizes the number of requests served before the deadlines computed on the defined SLO expire.
5459
package ordering
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package ordering
18+
19+
import (
20+
"encoding/json"
21+
"strconv"
22+
"strings"
23+
"time"
24+
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/common/request"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
28+
)
29+
30+
const (
31+
// SLODeadlineOrderingPolicyType orders requests by an SLO-based deadline
32+
//
33+
// It selects the request with the earliest SLO-based deadline, computed as `ReceivedTimestamp() + x-slo-ttft-ms header (interpreted as milliseconds)`.
34+
// Requests without a valid x-slo-ttft-ms header are treated as having no deadline and are scheduled after SLO-bound requests,
35+
// with FCFS as a tie-breaker.
36+
SLODeadlineOrderingPolicyType = "slo-deadline-ordering-policy"
37+
38+
// sloTtftHeader is the request header name for SLO time-to-first-token in milliseconds.
39+
sloTtftHeader = "x-slo-ttft-ms"
40+
)
41+
42+
func SLODeadlineOrderingPolicyFactory(name string, _ json.RawMessage, _ plugin.Handle) (plugin.Plugin, error) {
43+
return newSLODeadlinePolicy().withName(name), nil
44+
}
45+
46+
type sloDeadlinePolicy struct {
47+
name string
48+
}
49+
50+
var _ flowcontrol.OrderingPolicy = &sloDeadlinePolicy{}
51+
52+
func newSLODeadlinePolicy() *sloDeadlinePolicy {
53+
return &sloDeadlinePolicy{
54+
name: SLODeadlineOrderingPolicyType,
55+
}
56+
}
57+
58+
func (p *sloDeadlinePolicy) withName(name string) *sloDeadlinePolicy {
59+
if name != "" {
60+
p.name = name
61+
}
62+
return p
63+
}
64+
65+
func (p *sloDeadlinePolicy) Name() string {
66+
return p.name
67+
}
68+
69+
// RequiredQueueCapabilities returns the queue capabilities required by this policy.
70+
func (p *sloDeadlinePolicy) RequiredQueueCapabilities() []flowcontrol.QueueCapability {
71+
return []flowcontrol.QueueCapability{flowcontrol.CapabilityPriorityConfigurable}
72+
}
73+
74+
func (p *sloDeadlinePolicy) TypedName() plugin.TypedName {
75+
return plugin.TypedName{
76+
Type: SLODeadlineOrderingPolicyType,
77+
Name: p.name,
78+
}
79+
}
80+
81+
var sloMaxDeadlineTime = time.Unix(0, 1<<63-1)
82+
83+
// calculateSLODeadline computes the SLO-based deadline for a request: ReceivedTimestamp + x-slo-ttft-ms (ms).
84+
// The header is read from the InferenceRequest()'s headers. If the header is missing, empty, or invalid,
85+
// the request is assigned a far-future deadline so it sorts after SLO-bound requests.
86+
func calculateSLODeadline(item flowcontrol.QueueItemAccessor) time.Time {
87+
req := item.OriginalRequest()
88+
if req == nil {
89+
return sloMaxDeadlineTime
90+
}
91+
infReq := req.InferenceRequest()
92+
if infReq == nil || infReq.Headers == nil {
93+
return sloMaxDeadlineTime
94+
}
95+
sloTtft := request.GetHeader(infReq.Headers, sloTtftHeader)
96+
if sloTtft == "" {
97+
return sloMaxDeadlineTime
98+
}
99+
ms, err := strconv.ParseInt(strings.TrimSpace(sloTtft), 10, 64)
100+
if err != nil || ms < 0 {
101+
return sloMaxDeadlineTime
102+
}
103+
return req.ReceivedTimestamp().Add(time.Duration(ms) * time.Millisecond)
104+
}
105+
106+
// Less returns true if item 'a' should be dispatched before item 'b'.
107+
// It orders by SLO deadline (earliest first), using FCFS as a tie-breaker.
108+
func (p *sloDeadlinePolicy) Less(a, b flowcontrol.QueueItemAccessor) bool {
109+
if a == nil && b == nil {
110+
return false
111+
}
112+
if a == nil {
113+
return false
114+
}
115+
if b == nil {
116+
return true
117+
}
118+
deadlineA := calculateSLODeadline(a)
119+
deadlineB := calculateSLODeadline(b)
120+
if !deadlineA.Equal(deadlineB) {
121+
return deadlineA.Before(deadlineB)
122+
}
123+
reqA := a.OriginalRequest()
124+
reqB := b.OriginalRequest()
125+
if reqA == nil && reqB == nil {
126+
return false
127+
}
128+
if reqA == nil {
129+
return false
130+
}
131+
if reqB == nil {
132+
return true
133+
}
134+
return reqA.ReceivedTimestamp().Before(reqB.ReceivedTimestamp())
135+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package ordering
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/flowcontrol/mocks"
28+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"
29+
)
30+
31+
func TestSLODeadlinePolicy_Name(t *testing.T) {
32+
t.Parallel()
33+
policy := newSLODeadlinePolicy()
34+
assert.Equal(t, SLODeadlineOrderingPolicyType, policy.Name())
35+
}
36+
37+
func TestSLODeadlinePolicy_WithName(t *testing.T) {
38+
t.Parallel()
39+
policy := newSLODeadlinePolicy().withName("test-name")
40+
assert.Equal(t, "test-name", policy.Name())
41+
}
42+
43+
func TestSLODeadlinePolicy_RequiredQueueCapabilities(t *testing.T) {
44+
t.Parallel()
45+
policy := newSLODeadlinePolicy()
46+
caps := policy.RequiredQueueCapabilities()
47+
require.Len(t, caps, 1)
48+
assert.Equal(t, flowcontrol.CapabilityPriorityConfigurable, caps[0])
49+
}
50+
51+
// makeSLOItem builds a QueueItemAccessor with the given SLO header and received time.
52+
func makeSLOItem(id string, received time.Time, sloTTFTMs string) flowcontrol.QueueItemAccessor {
53+
req := mocks.NewMockFlowControlRequest(10, id, testFlowKey)
54+
req.ReceivedTimestampV = received
55+
req.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{sloTtftHeader: sloTTFTMs}}
56+
return &mocks.MockQueueItemAccessor{
57+
EffectiveTTLV: 0,
58+
OriginalRequestV: req,
59+
}
60+
}
61+
62+
func TestSLODeadline_Less(t *testing.T) {
63+
t.Parallel()
64+
policy := newSLODeadlinePolicy()
65+
66+
now := time.Now()
67+
68+
// A: received now, 100ms SLO → deadline now+100ms
69+
itemA := makeSLOItem("a", now, "100")
70+
// B: received now, 50ms SLO → deadline now+50ms (earlier)
71+
itemB := makeSLOItem("b", now, "50")
72+
// C: received now+20ms, 50ms SLO → deadline now+20ms+50ms = now+70ms (after B but earlier than A)
73+
itemC := makeSLOItem("c", now.Add(20*time.Millisecond), "50")
74+
// D: no header → far-future deadline
75+
reqD := mocks.NewMockFlowControlRequest(10, "d", testFlowKey)
76+
reqD.ReceivedTimestampV = now
77+
reqD.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{}}
78+
itemD := &mocks.MockQueueItemAccessor{EffectiveTTLV: 0, OriginalRequestV: reqD}
79+
// E: same deadline as B (received 1s earlier + 1050ms SLO = now+50ms), earlier ReceivedTimestamp → wins tie-breaker
80+
itemE := makeSLOItem("e", now.Add(-time.Second), "1050")
81+
82+
testCases := []struct {
83+
name string
84+
a flowcontrol.QueueItemAccessor
85+
b flowcontrol.QueueItemAccessor
86+
expected bool
87+
}{
88+
{"earlier SLO deadline first (B before A)", itemB, itemA, true},
89+
{"later SLO deadline after (A after B)", itemA, itemB, false},
90+
{"received later but earlier deadline (C before A)", itemC, itemA, true},
91+
{"SLO-bound before no-header (A before D)", itemA, itemD, true},
92+
{"no-header after SLO-bound (D after A)", itemD, itemA, false},
93+
{"same deadline: earlier ReceivedTimestamp first (E before B)", itemE, itemB, true},
94+
{"same deadline: later ReceivedTimestamp after (B after E)", itemB, itemE, false},
95+
{"a is nil → b wins", nil, itemA, false},
96+
{"b is nil → a wins", itemA, nil, true},
97+
{"both nil → false", nil, nil, false},
98+
}
99+
100+
for _, tc := range testCases {
101+
t.Run(tc.name, func(t *testing.T) {
102+
t.Parallel()
103+
assert.Equal(t, tc.expected, policy.Less(tc.a, tc.b))
104+
})
105+
}
106+
}
107+
108+
func TestCalculateSLODeadline(t *testing.T) {
109+
t.Parallel()
110+
111+
now := time.Now()
112+
113+
// Valid header
114+
reqValid := mocks.NewMockFlowControlRequest(1, "valid", testFlowKey)
115+
reqValid.ReceivedTimestampV = now
116+
reqValid.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{sloTtftHeader: "200"}}
117+
accValid := &mocks.MockQueueItemAccessor{OriginalRequestV: reqValid}
118+
deadline := calculateSLODeadline(accValid)
119+
assert.Equal(t, now.Add(200*time.Millisecond), deadline)
120+
121+
// Missing header
122+
reqNoHeader := mocks.NewMockFlowControlRequest(2, "no", testFlowKey)
123+
reqNoHeader.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{}}
124+
accNoHeader := &mocks.MockQueueItemAccessor{OriginalRequestV: reqNoHeader}
125+
assert.Equal(t, sloMaxDeadlineTime, calculateSLODeadline(accNoHeader))
126+
127+
reqNoHeader.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{"x-some-header": "200"}}
128+
accNoHeader = &mocks.MockQueueItemAccessor{OriginalRequestV: reqNoHeader}
129+
assert.Equal(t, sloMaxDeadlineTime, calculateSLODeadline(accNoHeader))
130+
131+
// Invalid value
132+
reqInvalid := mocks.NewMockFlowControlRequest(3, "inv", testFlowKey)
133+
reqInvalid.InferenceRequestV = &scheduling.LLMRequest{Headers: map[string]string{sloTtftHeader: "x"}}
134+
accInvalid := &mocks.MockQueueItemAccessor{OriginalRequestV: reqInvalid}
135+
assert.Equal(t, sloMaxDeadlineTime, calculateSLODeadline(accInvalid))
136+
137+
// Nil OriginalRequest
138+
accNilReq := &mocks.MockQueueItemAccessor{OriginalRequestV: nil}
139+
assert.Equal(t, sloMaxDeadlineTime, calculateSLODeadline(accNilReq))
140+
141+
// Nil InferenceRequest
142+
reqNilInfReq := mocks.NewMockFlowControlRequest(4, "no-inf-req", testFlowKey)
143+
accNoInfReq := &mocks.MockQueueItemAccessor{OriginalRequestV: reqNilInfReq}
144+
assert.Equal(t, sloMaxDeadlineTime, calculateSLODeadline(accNoInfReq))
145+
}

site-src/guides/epp-configuration/config-text.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,13 @@ An Ordering Policy that implements Earliest Deadline First. It prioritizes reque
312312
- *Type*: edf-ordering-policy
313313
- *Parameters*: none
314314

315+
#### SLODeadlineOrderingPolicy
316+
317+
An Ordering Policy that orders requests by an SLO-based deadline, computed from the time the request is received by the server. It prioritizes requests with the earliest such deadline.
318+
319+
- *Type*: slo-deadline-ordering-policy
320+
- *Parameters*: none
321+
315322
## Scheduling Profiles
316323

317324
The `schedulingProfiles` section defines the set of scheduling profiles that can be used in scheduling

0 commit comments

Comments
 (0)