From c9290b76961c46f476761ccadce0cb006d474b92 Mon Sep 17 00:00:00 2001 From: jwilson Date: Sat, 21 May 2016 14:24:02 -0400 Subject: [PATCH] Recover from REFUSED_STREAM errors in HTTP/2. This implements the following policy: - If a REFUSED_STREAM error is received, OkHttp will retry the same stream on the same socket 1x. - If any other error is received, or an additional REFUSED_STREAM error is received, OkHttp will retry on a different route if one exists. We may want to follow up by going through HTTP/2 error codes and deciding a per-code retry policy, but this should be good enough for now. Closes: https://github.com/square/okhttp/issues/2543 --- .../okhttp3/mockwebserver/MockResponse.java | 15 +++ .../okhttp3/mockwebserver/MockWebServer.java | 19 +++- .../okhttp3/mockwebserver/SocketPolicy.java | 8 +- .../test/java/okhttp3/URLConnectionTest.java | 9 +- .../internal/framed/HttpOverSpdyTest.java | 98 ++++++++++++++++- .../okhttp3/internal/framed/FramedStream.java | 6 +- .../internal/framed/StreamResetException.java | 28 +++++ .../okhttp3/internal/http/HttpEngine.java | 57 +++++++++- .../internal/http/StreamAllocation.java | 102 +++++++----------- 9 files changed, 264 insertions(+), 78 deletions(-) create mode 100644 okhttp/src/main/java/okhttp3/internal/framed/StreamResetException.java diff --git a/mockwebserver/src/main/java/okhttp3/mockwebserver/MockResponse.java b/mockwebserver/src/main/java/okhttp3/mockwebserver/MockResponse.java index 9f56b34f9996..1826aa4fff04 100644 --- a/mockwebserver/src/main/java/okhttp3/mockwebserver/MockResponse.java +++ b/mockwebserver/src/main/java/okhttp3/mockwebserver/MockResponse.java @@ -38,6 +38,7 @@ public final class MockResponse implements Cloneable { private TimeUnit throttlePeriodUnit = TimeUnit.SECONDS; private SocketPolicy socketPolicy = SocketPolicy.KEEP_OPEN; + private int http2ErrorCode = -1; private long bodyDelayAmount = 0; private TimeUnit bodyDelayUnit = TimeUnit.MILLISECONDS; @@ -205,6 +206,20 @@ public MockResponse setSocketPolicy(SocketPolicy socketPolicy) { return this; } + public int getHttp2ErrorCode() { + return http2ErrorCode; + } + + /** + * Sets the HTTP/2 error code to be + * returned when resetting the stream. This is only valid with {@link + * SocketPolicy#RESET_STREAM_AT_START}. + */ + public MockResponse setHttp2ErrorCode(int http2ErrorCode) { + this.http2ErrorCode = http2ErrorCode; + return this; + } + /** * Throttles the request reader and response writer to sleep for the given period after each * series of {@code bytesPerPeriod} bytes are transferred. Use this to simulate network behavior. diff --git a/mockwebserver/src/main/java/okhttp3/mockwebserver/MockWebServer.java b/mockwebserver/src/main/java/okhttp3/mockwebserver/MockWebServer.java index aa373f8b0197..89df44253403 100644 --- a/mockwebserver/src/main/java/okhttp3/mockwebserver/MockWebServer.java +++ b/mockwebserver/src/main/java/okhttp3/mockwebserver/MockWebServer.java @@ -18,6 +18,7 @@ package okhttp3.mockwebserver; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ProtocolException; @@ -88,6 +89,7 @@ import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_DURING_RESPONSE_BODY; import static okhttp3.mockwebserver.SocketPolicy.FAIL_HANDSHAKE; import static okhttp3.mockwebserver.SocketPolicy.NO_RESPONSE; +import static okhttp3.mockwebserver.SocketPolicy.RESET_STREAM_AT_START; import static okhttp3.mockwebserver.SocketPolicy.SHUTDOWN_INPUT_AT_END; import static okhttp3.mockwebserver.SocketPolicy.SHUTDOWN_OUTPUT_AT_END; import static okhttp3.mockwebserver.SocketPolicy.UPGRADE_TO_SSL_AT_END; @@ -562,8 +564,11 @@ private void processHandshakeFailure(Socket raw) throws Exception { private void dispatchBookkeepingRequest(int sequenceNumber, Socket socket) throws InterruptedException { + RecordedRequest request = new RecordedRequest( + null, null, null, -1, null, sequenceNumber, socket); requestCount.incrementAndGet(); - dispatcher.dispatch(new RecordedRequest(null, null, null, -1, null, sequenceNumber, socket)); + requestQueue.add(request); + dispatcher.dispatch(request); } /** @param sequenceNumber the index of this request on this connection. */ @@ -843,7 +848,19 @@ private FramedSocketHandler(Socket socket, Protocol protocol) { } @Override public void onStream(FramedStream stream) throws IOException { + MockResponse peekedResponse = dispatcher.peek(); + if (peekedResponse.getSocketPolicy() == RESET_STREAM_AT_START) { + try { + dispatchBookkeepingRequest(sequenceNumber.getAndIncrement(), socket); + stream.close(ErrorCode.fromHttp2(peekedResponse.getHttp2ErrorCode())); + return; + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + RecordedRequest request = readRequest(stream); + requestCount.incrementAndGet(); requestQueue.add(request); MockResponse response; try { diff --git a/mockwebserver/src/main/java/okhttp3/mockwebserver/SocketPolicy.java b/mockwebserver/src/main/java/okhttp3/mockwebserver/SocketPolicy.java index 3142cb0767a6..eddb7557a45c 100644 --- a/mockwebserver/src/main/java/okhttp3/mockwebserver/SocketPolicy.java +++ b/mockwebserver/src/main/java/okhttp3/mockwebserver/SocketPolicy.java @@ -88,5 +88,11 @@ public enum SocketPolicy { * Don't respond to the request but keep the socket open. For testing read response header timeout * issue. */ - NO_RESPONSE + NO_RESPONSE, + + /** + * Fail HTTP/2 requests without processing them by sending an {@linkplain + * MockResponse#getHttp2ErrorCode() HTTP/2 error code}. + */ + RESET_STREAM_AT_START } diff --git a/okhttp-tests/src/test/java/okhttp3/URLConnectionTest.java b/okhttp-tests/src/test/java/okhttp3/URLConnectionTest.java index f3c69dc51e71..2c3c8bde1e68 100644 --- a/okhttp-tests/src/test/java/okhttp3/URLConnectionTest.java +++ b/okhttp-tests/src/test/java/okhttp3/URLConnectionTest.java @@ -631,9 +631,12 @@ private void doUpload(TransferKind uploadKind, WriteKind writeKind) throws Excep assertContent("this response comes via SSL", connection); - RecordedRequest request = server.takeRequest(); - assertEquals("GET /foo HTTP/1.1", request.getRequestLine()); - assertEquals(TlsVersion.TLS_1_0, request.getTlsVersion()); + RecordedRequest failHandshakeRequest = server.takeRequest(); + assertNull(failHandshakeRequest.getRequestLine()); + + RecordedRequest fallbackRequest = server.takeRequest(); + assertEquals("GET /foo HTTP/1.1", fallbackRequest.getRequestLine()); + assertEquals(TlsVersion.TLS_1_0, fallbackRequest.getTlsVersion()); } @Test public void connectViaHttpsWithSSLFallbackFailuresRecorded() throws Exception { diff --git a/okhttp-tests/src/test/java/okhttp3/internal/framed/HttpOverSpdyTest.java b/okhttp-tests/src/test/java/okhttp3/internal/framed/HttpOverSpdyTest.java index 7c27cd29d7ee..c189d6be1a3b 100644 --- a/okhttp-tests/src/test/java/okhttp3/internal/framed/HttpOverSpdyTest.java +++ b/okhttp-tests/src/test/java/okhttp3/internal/framed/HttpOverSpdyTest.java @@ -34,17 +34,19 @@ import okhttp3.OkHttpClient; import okhttp3.Protocol; import okhttp3.RecordingCookieJar; +import okhttp3.RecordingHostnameVerifier; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import okhttp3.internal.DoubleInetAddressDns; import okhttp3.internal.RecordingOkAuthenticator; +import okhttp3.internal.SingleInetAddressDns; import okhttp3.internal.SslContextBuilder; import okhttp3.internal.Util; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import okhttp3.mockwebserver.SocketPolicy; -import okhttp3.RecordingHostnameVerifier; import okio.Buffer; import okio.BufferedSink; import okio.GzipSink; @@ -86,6 +88,7 @@ protected HttpOverSpdyTest(Protocol protocol) { cache = new Cache(tempDir.getRoot(), Integer.MAX_VALUE); client = new OkHttpClient.Builder() .protocols(Arrays.asList(protocol, Protocol.HTTP_1_1)) + .dns(new SingleInetAddressDns()) .sslSocketFactory(sslContext.getSocketFactory()) .hostnameVerifier(hostnameVerifier) .build(); @@ -582,6 +585,99 @@ protected HttpOverSpdyTest(Protocol protocol) { assertEquals(0, server.takeRequest().getSequenceNumber()); } + @Test public void recoverFromOneRefusedStreamReusesConnection() throws Exception { + server.enqueue(new MockResponse() + .setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START) + .setHttp2ErrorCode(ErrorCode.REFUSED_STREAM.httpCode)); + server.enqueue(new MockResponse() + .setBody("abc")); + + Call call = client.newCall(new Request.Builder() + .url(server.url("/")) + .build()); + Response response = call.execute(); + assertEquals("abc", response.body().string()); + + assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection. + assertEquals(1, server.takeRequest().getSequenceNumber()); // Reused connection. + } + + @Test public void recoverFromOneInternalErrorRequiresNewConnection() throws Exception { + server.enqueue(new MockResponse() + .setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START) + .setHttp2ErrorCode(ErrorCode.INTERNAL_ERROR.httpCode)); + server.enqueue(new MockResponse() + .setBody("abc")); + + client = client.newBuilder() + .dns(new DoubleInetAddressDns()) + .build(); + + Call call = client.newCall(new Request.Builder() + .url(server.url("/")) + .build()); + Response response = call.execute(); + assertEquals("abc", response.body().string()); + + assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection. + assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection. + } + + @Test public void recoverFromMultipleRefusedStreamsRequiresNewConnection() throws Exception { + server.enqueue(new MockResponse() + .setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START) + .setHttp2ErrorCode(ErrorCode.REFUSED_STREAM.httpCode)); + server.enqueue(new MockResponse() + .setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START) + .setHttp2ErrorCode(ErrorCode.REFUSED_STREAM.httpCode)); + server.enqueue(new MockResponse() + .setBody("abc")); + + client = client.newBuilder() + .dns(new DoubleInetAddressDns()) + .build(); + + Call call = client.newCall(new Request.Builder() + .url(server.url("/")) + .build()); + Response response = call.execute(); + assertEquals("abc", response.body().string()); + + assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection. + assertEquals(1, server.takeRequest().getSequenceNumber()); // Reused connection. + assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection. + } + + @Test public void noRecoveryFromRefusedStreamWithRetryDisabled() throws Exception { + noRecoveryFromErrorWithRetryDisabled(ErrorCode.REFUSED_STREAM); + } + + @Test public void noRecoveryFromInternalErrorWithRetryDisabled() throws Exception { + noRecoveryFromErrorWithRetryDisabled(ErrorCode.INTERNAL_ERROR); + } + + private void noRecoveryFromErrorWithRetryDisabled(ErrorCode errorCode) throws Exception { + server.enqueue(new MockResponse() + .setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START) + .setHttp2ErrorCode(errorCode.httpCode)); + server.enqueue(new MockResponse() + .setBody("abc")); + + client = client.newBuilder() + .retryOnConnectionFailure(false) + .build(); + + Call call = client.newCall(new Request.Builder() + .url(server.url("/")) + .build()); + try { + call.execute(); + fail(); + } catch (StreamResetException expected) { + assertEquals(errorCode, expected.errorCode); + } + } + public Buffer gzip(String bytes) throws IOException { Buffer bytesOut = new Buffer(); BufferedSink sink = Okio.buffer(new GzipSink(bytesOut)); diff --git a/okhttp/src/main/java/okhttp3/internal/framed/FramedStream.java b/okhttp/src/main/java/okhttp3/internal/framed/FramedStream.java index c94a8449eced..d6ce15eeb68e 100644 --- a/okhttp/src/main/java/okhttp3/internal/framed/FramedStream.java +++ b/okhttp/src/main/java/okhttp3/internal/framed/FramedStream.java @@ -142,7 +142,7 @@ public synchronized List
getResponseHeaders() throws IOException { readTimeout.exitAndThrowIfTimedOut(); } if (responseHeaders != null) return responseHeaders; - throw new IOException("stream was reset: " + errorCode); + throw new StreamResetException(errorCode); } /** @@ -438,7 +438,7 @@ private void checkNotClosed() throws IOException { throw new IOException("stream closed"); } if (errorCode != null) { - throw new IOException("stream was reset: " + errorCode); + throw new StreamResetException(errorCode); } } } @@ -571,7 +571,7 @@ private void checkOutNotClosed() throws IOException { } else if (sink.finished) { throw new IOException("stream finished"); } else if (errorCode != null) { - throw new IOException("stream was reset: " + errorCode); + throw new StreamResetException(errorCode); } } diff --git a/okhttp/src/main/java/okhttp3/internal/framed/StreamResetException.java b/okhttp/src/main/java/okhttp3/internal/framed/StreamResetException.java new file mode 100644 index 000000000000..a7af0e4b41b2 --- /dev/null +++ b/okhttp/src/main/java/okhttp3/internal/framed/StreamResetException.java @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.internal.framed; + +import java.io.IOException; + +/** Thrown when an HTTP/2 stream is canceled without damage to the socket that carries it. */ +public final class StreamResetException extends IOException { + public final ErrorCode errorCode; + + public StreamResetException(ErrorCode errorCode) { + super("stream was reset: " + errorCode); + this.errorCode = errorCode; + } +} diff --git a/okhttp/src/main/java/okhttp3/internal/http/HttpEngine.java b/okhttp/src/main/java/okhttp3/internal/http/HttpEngine.java index 17ee87a4a447..d29640d9461c 100644 --- a/okhttp/src/main/java/okhttp3/internal/http/HttpEngine.java +++ b/okhttp/src/main/java/okhttp3/internal/http/HttpEngine.java @@ -17,11 +17,16 @@ package okhttp3.internal.http; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.ProtocolException; import java.net.Proxy; +import java.net.SocketTimeoutException; +import java.security.cert.CertificateException; import java.util.Date; import java.util.List; import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSocketFactory; import okhttp3.Address; import okhttp3.CertificatePinner; @@ -352,12 +357,22 @@ public Connection getConnection() { * permanent. Requests with a body can only be recovered if the body is buffered. */ public HttpEngine recover(IOException e, boolean routeException, Sink requestBodyOut) { - if (!streamAllocation.recover(e, routeException, requestBodyOut)) { - return null; - } + streamAllocation.streamFailed(e); if (!client.retryOnConnectionFailure()) { - return null; + return null; // The application layer has forbidden retries. + } + + if (requestBodyOut != null && !(requestBodyOut instanceof RetryableSink)) { + return null; // The body on this request cannot be retried. + } + + if (!isRecoverable(e, routeException)) { + return null; // This exception is fatal. + } + + if (!streamAllocation.hasMoreRoutes()) { + return null; // No more routes to attempt. } StreamAllocation streamAllocation = close(); @@ -371,6 +386,38 @@ public HttpEngine recover(IOException e, boolean routeException) { return recover(e, routeException, requestBodyOut); } + private boolean isRecoverable(IOException e, boolean routeException) { + // If there was a protocol problem, don't recover. + if (e instanceof ProtocolException) { + return false; + } + + // If there was an interruption don't recover, but if there was a timeout connecting to a route + // we should try the next route (if there is one). + if (e instanceof InterruptedIOException) { + return e instanceof SocketTimeoutException && routeException; + } + + // Look for known client-side or negotiation errors that are unlikely to be fixed by trying + // again with a different route. + if (e instanceof SSLHandshakeException) { + // If the problem was a CertificateException from the X509TrustManager, + // do not retry. + if (e.getCause() instanceof CertificateException) { + return false; + } + } + if (e instanceof SSLPeerUnverifiedException) { + // e.g. a certificate pinning error. + return false; + } + + // An example of one we might want to retry with a different route is a problem connecting to a + // proxy and would manifest as a standard IOException. Unless it is one we know we should not + // retry, we return true and try a new route. + return true; + } + private void maybeCache() throws IOException { InternalCache responseCache = Internal.instance.internalCache(client); if (responseCache == null) return; @@ -428,7 +475,7 @@ public StreamAllocation close() { closeQuietly(userResponse.body()); } else { // If this engine never achieved a response body, its stream allocation is dead. - streamAllocation.connectionFailed(null); + streamAllocation.streamFailed(null); } return streamAllocation; diff --git a/okhttp/src/main/java/okhttp3/internal/http/StreamAllocation.java b/okhttp/src/main/java/okhttp3/internal/http/StreamAllocation.java index 385dc2b2ac71..aee74cf0bb1d 100644 --- a/okhttp/src/main/java/okhttp3/internal/http/StreamAllocation.java +++ b/okhttp/src/main/java/okhttp3/internal/http/StreamAllocation.java @@ -16,22 +16,17 @@ package okhttp3.internal.http; import java.io.IOException; -import java.io.InterruptedIOException; import java.lang.ref.Reference; import java.lang.ref.WeakReference; -import java.net.ProtocolException; -import java.net.SocketTimeoutException; -import java.security.cert.CertificateException; -import javax.net.ssl.SSLHandshakeException; -import javax.net.ssl.SSLPeerUnverifiedException; import okhttp3.Address; import okhttp3.ConnectionPool; import okhttp3.Route; import okhttp3.internal.Internal; import okhttp3.internal.RouteDatabase; import okhttp3.internal.Util; +import okhttp3.internal.framed.ErrorCode; +import okhttp3.internal.framed.StreamResetException; import okhttp3.internal.io.RealConnection; -import okio.Sink; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -78,7 +73,8 @@ public final class StreamAllocation { private final ConnectionPool connectionPool; // State guarded by connectionPool. - private RouteSelector routeSelector; + private final RouteSelector routeSelector; + private int refusedStreamCount; private RealConnection connection; private boolean released; private boolean canceled; @@ -134,12 +130,14 @@ private RealConnection findHealthyConnection(int connectTimeout, int readTimeout } } - // Otherwise do a potentially-slow check to confirm that the pooled connection is still good. - if (candidate.isHealthy(doExtensiveHealthChecks)) { - return candidate; + // Do a (potentially slow) check to confirm that the pooled connection is still good. If it + // isn't, take it out of the pool and start again. + if (!candidate.isHealthy(doExtensiveHealthChecks)) { + noNewStreams(); + continue; } - connectionFailed(new IOException()); + return candidate; } } @@ -174,6 +172,7 @@ private RealConnection findConnection(int connectTimeout, int readTimeout, int w selectedRoute = routeSelector.next(); synchronized (connectionPool) { route = selectedRoute; + refusedStreamCount = 0; } } RealConnection newConnection = new RealConnection(selectedRoute); @@ -276,17 +275,35 @@ public void cancel() { } } - public void connectionFailed(IOException e) { + public void streamFailed(IOException e) { + boolean noNewStreams = false; + synchronized (connectionPool) { - // Avoid this route if it's never seen a successful call. - if (connection != null && connection.successCount == 0) { - if (route != null && e != null) { - routeSelector.connectFailed(route, e); + if (e instanceof StreamResetException) { + StreamResetException streamResetException = (StreamResetException) e; + if (streamResetException.errorCode == ErrorCode.REFUSED_STREAM) { + refusedStreamCount++; + } + // On HTTP/2 stream errors, retry REFUSED_STREAM errors once on the same connection. All + // other errors must be retried on a new connection. + if (streamResetException.errorCode != ErrorCode.REFUSED_STREAM || refusedStreamCount > 1) { + noNewStreams = true; + route = null; + } + } else if (connection != null && !connection.isMultiplexed()) { + noNewStreams = true; + + // If this route hasn't completed a call, avoid it for new connections. + if (connection.successCount == 0) { + if (route != null && e != null) { + routeSelector.connectFailed(route, e); + } + route = null; } - route = null; } } - deallocate(true, false, true); + + deallocate(noNewStreams, false, true); } /** @@ -309,51 +326,8 @@ private void release(RealConnection connection) { throw new IllegalStateException(); } - public boolean recover(IOException e, boolean routeException, Sink requestBodyOut) { - if (connection != null) { - connectionFailed(e); - } - - boolean canRetryRequestBody = requestBodyOut == null || requestBodyOut instanceof RetryableSink; - if ((routeSelector != null && !routeSelector.hasNext()) // No more routes to attempt. - || !isRecoverable(e, routeException) - || !canRetryRequestBody) { - return false; - } - - return true; - } - - private boolean isRecoverable(IOException e, boolean routeException) { - // If there was a protocol problem, don't recover. - if (e instanceof ProtocolException) { - return false; - } - - // If there was an interruption don't recover, but if there was a timeout connecting to a route - // we should try the next route (if there is one). - if (e instanceof InterruptedIOException) { - return e instanceof SocketTimeoutException && routeException; - } - - // Look for known client-side or negotiation errors that are unlikely to be fixed by trying - // again with a different route. - if (e instanceof SSLHandshakeException) { - // If the problem was a CertificateException from the X509TrustManager, - // do not retry. - if (e.getCause() instanceof CertificateException) { - return false; - } - } - if (e instanceof SSLPeerUnverifiedException) { - // e.g. a certificate pinning error. - return false; - } - - // An example of one we might want to retry with a different route is a problem connecting to a - // proxy and would manifest as a standard IOException. Unless it is one we know we should not - // retry, we return true and try a new route. - return true; + public boolean hasMoreRoutes() { + return route != null || routeSelector.hasNext(); } @Override public String toString() {