Skip to content

Commit bff14c5

Browse files
committed
http2: don't PING a responsive server when resetting a stream
When sending a RST_STREAM for a canceled request, we sometimes send a PING frame along with the reset to confirm that the server is responsive and has received the reset. Sending too many PINGs trips denial-of-service detection on some servers, causing them to close a connection with an ENHANCE_YOUR_CALM error. Do not send a PING frame along with an RST_STREAM if the connection has displayed signs of life since the canceled request began. Specifically, if we've received any stream-related frames since the request was sent, assume the server is responsive and do not send a PING. We still send a PING if a request is canceled and no stream-related frames have been received from the server since the request was first sent. For golang/go#76296 Change-Id: I1be3532febf9ac99d65e9cd35346c02306db5f9d Reviewed-on: https://go-review.googlesource.com/c/net/+/720300 Reviewed-by: Nicholas Husin <husin@google.com> Reviewed-by: Nicholas Husin <nsh@golang.org> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
1 parent 88a6421 commit bff14c5

File tree

3 files changed

+72
-26
lines changed

3 files changed

+72
-26
lines changed

http2/clientconn_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,12 @@ func (b *testRequestBody) closeWithError(err error) {
295295
// (Note that the RoundTrip won't complete until response headers are received,
296296
// the request times out, or some other terminal condition is reached.)
297297
func (tc *testClientConn) roundTrip(req *http.Request) *testRoundTrip {
298+
ctx, cancel := context.WithCancel(req.Context())
299+
req = req.WithContext(ctx)
298300
rt := &testRoundTrip{
299-
t: tc.t,
300-
donec: make(chan struct{}),
301+
t: tc.t,
302+
donec: make(chan struct{}),
303+
cancel: cancel,
301304
}
302305
tc.roundtrips = append(tc.roundtrips, rt)
303306
go func() {
@@ -367,6 +370,7 @@ type testRoundTrip struct {
367370
respErr error
368371
donec chan struct{}
369372
id atomic.Uint32
373+
cancel context.CancelFunc
370374
}
371375

372376
// streamID returns the HTTP/2 stream ID of the request.

http2/transport.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,13 @@ type ClientConn struct {
376376
// completely unresponsive connection.
377377
pendingResets int
378378

379+
// readBeforeStreamID is the smallest stream ID that has not been followed by
380+
// a frame read from the peer. We use this to determine when a request may
381+
// have been sent to a completely unresponsive connection:
382+
// If the request ID is less than readBeforeStreamID, then we have had some
383+
// indication of life on the connection since sending the request.
384+
readBeforeStreamID uint32
385+
379386
// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
380387
// Write to reqHeaderMu to lock it, read from it to unlock.
381388
// Lock reqmu BEFORE mu or wmu.
@@ -1654,6 +1661,8 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
16541661
}
16551662
bodyClosed := cs.reqBodyClosed
16561663
closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
1664+
// Have we read any frames from the connection since sending this request?
1665+
readSinceStream := cc.readBeforeStreamID > cs.ID
16571666
cc.mu.Unlock()
16581667
if mustCloseBody {
16591668
cs.reqBody.Close()
@@ -1685,8 +1694,10 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
16851694
//
16861695
// This could be due to the server becoming unresponsive.
16871696
// To avoid sending too many requests on a dead connection,
1688-
// we let the request continue to consume a concurrency slot
1689-
// until we can confirm the server is still responding.
1697+
// if we haven't read any frames from the connection since
1698+
// sending this request, we let it continue to consume
1699+
// a concurrency slot until we can confirm the server is
1700+
// still responding.
16901701
// We do this by sending a PING frame along with the RST_STREAM
16911702
// (unless a ping is already in flight).
16921703
//
@@ -1697,7 +1708,7 @@ func (cs *clientStream) cleanupWriteRequest(err error) {
16971708
// because it's short lived and will probably be closed before
16981709
// we get the ping response.
16991710
ping := false
1700-
if !closeOnIdle {
1711+
if !closeOnIdle && !readSinceStream {
17011712
cc.mu.Lock()
17021713
// rstStreamPingsBlocked works around a gRPC behavior:
17031714
// see comment on the field for details.
@@ -2784,6 +2795,7 @@ func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientSt
27842795
// See comment on ClientConn.rstStreamPingsBlocked for details.
27852796
rl.cc.rstStreamPingsBlocked = false
27862797
}
2798+
rl.cc.readBeforeStreamID = rl.cc.nextStreamID
27872799
cs := rl.cc.streams[id]
27882800
if cs != nil && !cs.readAborted {
27892801
return cs

http2/transport_test.go

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2563,9 +2563,6 @@ func testTransportReturnsUnusedFlowControl(t testing.TB, oneDataFrame bool) {
25632563
}
25642564
return true
25652565
},
2566-
func(f *PingFrame) bool {
2567-
return true
2568-
},
25692566
func(f *WindowUpdateFrame) bool {
25702567
if !oneDataFrame && !sentAdditionalData {
25712568
t.Fatalf("Got WindowUpdateFrame, don't expect one yet")
@@ -5567,6 +5564,8 @@ func TestTransport1xxLimits(t *testing.T) {
55675564
}
55685565
}
55695566

5567+
// TestTransportSendPingWithReset verifies that when a request to an unresponsive server
5568+
// is canceled, it continues to consume a concurrency slot until the server responds to a PING.
55705569
func TestTransportSendPingWithReset(t *testing.T) { synctestTest(t, testTransportSendPingWithReset) }
55715570
func testTransportSendPingWithReset(t testing.TB) {
55725571
tc := newTestClientConn(t, func(tr *Transport) {
@@ -5578,33 +5577,25 @@ func testTransportSendPingWithReset(t testing.TB) {
55785577

55795578
// Start several requests.
55805579
var rts []*testRoundTrip
5581-
for i := 0; i < maxConcurrent+1; i++ {
5580+
for i := range maxConcurrent + 1 {
55825581
req := must(http.NewRequest("GET", "https://dummy.tld/", nil))
55835582
rt := tc.roundTrip(req)
55845583
if i >= maxConcurrent {
55855584
tc.wantIdle()
55865585
continue
55875586
}
55885587
tc.wantFrameType(FrameHeaders)
5589-
tc.writeHeaders(HeadersFrameParam{
5590-
StreamID: rt.streamID(),
5591-
EndHeaders: true,
5592-
BlockFragment: tc.makeHeaderBlockFragment(
5593-
":status", "200",
5594-
),
5595-
})
5596-
rt.wantStatus(200)
55975588
rts = append(rts, rt)
55985589
}
55995590

56005591
// Cancel one request. We send a PING frame along with the RST_STREAM.
5601-
rts[0].response().Body.Close()
5592+
rts[0].cancel()
56025593
tc.wantRSTStream(rts[0].streamID(), ErrCodeCancel)
56035594
pf := readFrame[*PingFrame](t, tc)
56045595
tc.wantIdle()
56055596

56065597
// Cancel another request. No PING frame, since one is in flight.
5607-
rts[1].response().Body.Close()
5598+
rts[1].cancel()
56085599
tc.wantRSTStream(rts[1].streamID(), ErrCodeCancel)
56095600
tc.wantIdle()
56105601

@@ -5613,16 +5604,55 @@ func testTransportSendPingWithReset(t testing.TB) {
56135604
tc.writePing(true, pf.Data)
56145605
tc.wantFrameType(FrameHeaders)
56155606
tc.wantIdle()
5607+
}
56165608

5617-
// Receive a byte of data for the remaining stream, which resets our ability
5618-
// to send pings (see comment on ClientConn.rstStreamPingsBlocked).
5619-
tc.writeData(rts[2].streamID(), false, []byte{0})
5609+
// TestTransportNoPingAfterResetWithFrames verifies that when a request to a responsive
5610+
// server is canceled (specifically: when frames have been received from the server
5611+
// in the time since the request was first sent), the request is immediately canceled and
5612+
// does not continue to consume a concurrency slot.
5613+
func TestTransportNoPingAfterResetWithFrames(t *testing.T) {
5614+
synctestTest(t, testTransportNoPingAfterResetWithFrames)
5615+
}
5616+
func testTransportNoPingAfterResetWithFrames(t testing.TB) {
5617+
tc := newTestClientConn(t, func(tr *Transport) {
5618+
tr.StrictMaxConcurrentStreams = true
5619+
})
56205620

5621-
// Cancel the last request. We send another PING, since none are in flight.
5622-
rts[2].response().Body.Close()
5623-
tc.wantRSTStream(rts[2].streamID(), ErrCodeCancel)
5624-
tc.wantFrameType(FramePing)
5621+
const maxConcurrent = 1
5622+
tc.greet(Setting{SettingMaxConcurrentStreams, maxConcurrent})
5623+
5624+
// Start request #1.
5625+
// The server immediately responds with request headers.
5626+
req1 := must(http.NewRequest("GET", "https://dummy.tld/", nil))
5627+
rt1 := tc.roundTrip(req1)
5628+
tc.wantFrameType(FrameHeaders)
5629+
tc.writeHeaders(HeadersFrameParam{
5630+
StreamID: rt1.streamID(),
5631+
EndHeaders: true,
5632+
BlockFragment: tc.makeHeaderBlockFragment(
5633+
":status", "200",
5634+
),
5635+
})
5636+
rt1.wantStatus(200)
5637+
5638+
// Start request #2.
5639+
// The connection is at its concurrency limit, so this request is not yet sent.
5640+
req2 := must(http.NewRequest("GET", "https://dummy.tld/", nil))
5641+
rt2 := tc.roundTrip(req2)
56255642
tc.wantIdle()
5643+
5644+
// Cancel request #1.
5645+
// This frees a concurrency slot, and request #2 is sent.
5646+
rt1.cancel()
5647+
tc.wantRSTStream(rt1.streamID(), ErrCodeCancel)
5648+
tc.wantFrameType(FrameHeaders)
5649+
5650+
// Cancel request #2.
5651+
// We send a PING along with the RST_STREAM, since no frames have been received
5652+
// since this request was sent.
5653+
rt2.cancel()
5654+
tc.wantRSTStream(rt2.streamID(), ErrCodeCancel)
5655+
tc.wantFrameType(FramePing)
56265656
}
56275657

56285658
// Issue #70505: gRPC gets upset if we send more than 2 pings per HEADERS/DATA frame

0 commit comments

Comments
 (0)