Skip to content

Commit 7d3dbb0

Browse files
committed
http2: buffer the most recently received PRIORITY_UPDATE frame
Per RFC 9218, servers should buffer the most recently received PRIORITY_UPDATE frame. This CL implements said buffering within the RFC 9218 priority write scheduler. For golang/go#75500 Change-Id: I259f4f6787053de6388ec513086cfa1b294fa607 Reviewed-on: https://go-review.googlesource.com/c/net/+/728401 LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Damien Neil <dneil@google.com> Reviewed-by: Nicholas Husin <husin@google.com>
1 parent 35e1306 commit 7d3dbb0

File tree

2 files changed

+78
-0
lines changed

2 files changed

+78
-0
lines changed

http2/writesched_priority_rfc9218.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ type priorityWriteSchedulerRFC9218 struct {
3737
// incremental streams or not, when urgency is the same in a given Pop()
3838
// call.
3939
prioritizeIncremental bool
40+
41+
// priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we
42+
// receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame.
43+
priorityUpdateBuf struct {
44+
// streamID being 0 means that the buffer is empty. This is a safe
45+
// assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR.
46+
streamID uint32
47+
priority PriorityParam
48+
}
4049
}
4150

4251
func newPriorityWriteSchedulerRFC9218() WriteScheduler {
@@ -50,6 +59,10 @@ func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStr
5059
if ws.streams[streamID].location != nil {
5160
panic(fmt.Errorf("stream %d already opened", streamID))
5261
}
62+
if streamID == ws.priorityUpdateBuf.streamID {
63+
ws.priorityUpdateBuf.streamID = 0
64+
opt.priority = ws.priorityUpdateBuf.priority
65+
}
5366
q := ws.queuePool.get()
5467
ws.streams[streamID] = streamMetadata{
5568
location: q,
@@ -95,6 +108,8 @@ func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority
95108
metadata := ws.streams[streamID]
96109
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
97110
if q == nil {
111+
ws.priorityUpdateBuf.streamID = streamID
112+
ws.priorityUpdateBuf.priority = priority
98113
return
99114
}
100115

http2/writesched_priority_rfc9218_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,3 +324,66 @@ func TestPrioritySchedulerIdempotentUpdate(t *testing.T) {
324324
t.Fatalf("popped streams %v, want %v", got, want)
325325
}
326326
}
327+
328+
func TestPrioritySchedulerBuffersPriorityUpdate(t *testing.T) {
329+
const maxFrameSize = 16
330+
sc := &serverConn{maxFrameSize: maxFrameSize}
331+
ws := newPriorityWriteSchedulerRFC9218()
332+
333+
// Priorities are adjusted for streams that are not open yet.
334+
ws.AdjustStream(1, PriorityParam{urgency: 0})
335+
ws.AdjustStream(5, PriorityParam{urgency: 0})
336+
for _, streamID := range []uint32{1, 3, 5} {
337+
stream := &stream{
338+
id: streamID,
339+
sc: sc,
340+
}
341+
stream.flow.add(1 << 20) // arbitrary large value
342+
ws.OpenStream(streamID, OpenStreamOptions{
343+
priority: PriorityParam{
344+
urgency: 7,
345+
incremental: 1,
346+
},
347+
})
348+
wr := FrameWriteRequest{
349+
write: &writeData{
350+
streamID: streamID,
351+
p: make([]byte, maxFrameSize*(3)),
352+
endStream: false,
353+
},
354+
stream: stream,
355+
}
356+
ws.Push(wr)
357+
}
358+
359+
const controlFrames = 2
360+
for range controlFrames {
361+
ws.Push(makeWriteNonStreamRequest())
362+
}
363+
364+
// We should get the control frames first.
365+
for range controlFrames {
366+
wr, ok := ws.Pop()
367+
if !ok || wr.StreamID() != 0 {
368+
t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
369+
}
370+
}
371+
372+
// The most recent priority adjustment is buffered and applied. Older ones
373+
// are ignored.
374+
want := []uint32{5, 5, 5, 1, 3, 1, 3, 1, 3}
375+
var got []uint32
376+
for {
377+
wr, ok := ws.Pop()
378+
if !ok {
379+
break
380+
}
381+
if wr.DataSize() != maxFrameSize {
382+
t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
383+
}
384+
got = append(got, wr.StreamID())
385+
}
386+
if !reflect.DeepEqual(got, want) {
387+
t.Fatalf("popped streams %v, want %v", got, want)
388+
}
389+
}

0 commit comments

Comments
 (0)