Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public final class MockResponse implements Cloneable {
private TimeUnit throttlePeriodUnit = TimeUnit.SECONDS;

private SocketPolicy socketPolicy = SocketPolicy.KEEP_OPEN;
private int http2ErrorCode = -1;

private long bodyDelayAmount = 0;
private TimeUnit bodyDelayUnit = TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -205,6 +206,20 @@ public MockResponse setSocketPolicy(SocketPolicy socketPolicy) {
return this;
}

public int getHttp2ErrorCode() {
return http2ErrorCode;
}

/**
* Sets the <a href="https://tools.ietf.org/html/rfc7540#section-7">HTTP/2 error code</a> to be
* returned when resetting the stream. This is only valid with {@link
* SocketPolicy#RESET_STREAM_AT_START}.
*/
public MockResponse setHttp2ErrorCode(int http2ErrorCode) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"FramedErrorCode"?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+Javadoc explaining that this is used only for SocketPolicy. RESET_STREAM_AT_START

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is defined by HTTP/2. Definitely needs doc. Added.

this.http2ErrorCode = http2ErrorCode;
return this;
}

/**
* Throttles the request reader and response writer to sleep for the given period after each
* series of {@code bytesPerPeriod} bytes are transferred. Use this to simulate network behavior.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package okhttp3.mockwebserver;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
Expand Down Expand Up @@ -88,6 +89,7 @@
import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_DURING_RESPONSE_BODY;
import static okhttp3.mockwebserver.SocketPolicy.FAIL_HANDSHAKE;
import static okhttp3.mockwebserver.SocketPolicy.NO_RESPONSE;
import static okhttp3.mockwebserver.SocketPolicy.RESET_STREAM_AT_START;
import static okhttp3.mockwebserver.SocketPolicy.SHUTDOWN_INPUT_AT_END;
import static okhttp3.mockwebserver.SocketPolicy.SHUTDOWN_OUTPUT_AT_END;
import static okhttp3.mockwebserver.SocketPolicy.UPGRADE_TO_SSL_AT_END;
Expand Down Expand Up @@ -562,8 +564,11 @@ private void processHandshakeFailure(Socket raw) throws Exception {

private void dispatchBookkeepingRequest(int sequenceNumber, Socket socket)
throws InterruptedException {
RecordedRequest request = new RecordedRequest(
null, null, null, -1, null, sequenceNumber, socket);
requestCount.incrementAndGet();
dispatcher.dispatch(new RecordedRequest(null, null, null, -1, null, sequenceNumber, socket));
requestQueue.add(request);
dispatcher.dispatch(request);
}

/** @param sequenceNumber the index of this request on this connection. */
Expand Down Expand Up @@ -843,7 +848,19 @@ private FramedSocketHandler(Socket socket, Protocol protocol) {
}

@Override public void onStream(FramedStream stream) throws IOException {
MockResponse peekedResponse = dispatcher.peek();
if (peekedResponse.getSocketPolicy() == RESET_STREAM_AT_START) {
try {
dispatchBookkeepingRequest(sequenceNumber.getAndIncrement(), socket);
stream.close(ErrorCode.fromHttp2(peekedResponse.getHttp2ErrorCode()));
return;
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}

RecordedRequest request = readRequest(stream);
requestCount.incrementAndGet();
requestQueue.add(request);
MockResponse response;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,11 @@ public enum SocketPolicy {
* Don't respond to the request but keep the socket open. For testing read response header timeout
* issue.
*/
NO_RESPONSE
NO_RESPONSE,

/**
* Fail HTTP/2 requests without processing them by sending an {@linkplain
* MockResponse#getHttp2ErrorCode() HTTP/2 error code}.
*/
RESET_STREAM_AT_START
}
9 changes: 6 additions & 3 deletions okhttp-tests/src/test/java/okhttp3/URLConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,12 @@ private void doUpload(TransferKind uploadKind, WriteKind writeKind) throws Excep

assertContent("this response comes via SSL", connection);

RecordedRequest request = server.takeRequest();
assertEquals("GET /foo HTTP/1.1", request.getRequestLine());
assertEquals(TlsVersion.TLS_1_0, request.getTlsVersion());
RecordedRequest failHandshakeRequest = server.takeRequest();
assertNull(failHandshakeRequest.getRequestLine());

RecordedRequest fallbackRequest = server.takeRequest();
assertEquals("GET /foo HTTP/1.1", fallbackRequest.getRequestLine());
assertEquals(TlsVersion.TLS_1_0, fallbackRequest.getTlsVersion());
}

@Test public void connectViaHttpsWithSSLFallbackFailuresRecorded() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.RecordingCookieJar;
import okhttp3.RecordingHostnameVerifier;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.internal.DoubleInetAddressDns;
import okhttp3.internal.RecordingOkAuthenticator;
import okhttp3.internal.SingleInetAddressDns;
import okhttp3.internal.SslContextBuilder;
import okhttp3.internal.Util;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import okhttp3.mockwebserver.SocketPolicy;
import okhttp3.RecordingHostnameVerifier;
import okio.Buffer;
import okio.BufferedSink;
import okio.GzipSink;
Expand Down Expand Up @@ -86,6 +88,7 @@ protected HttpOverSpdyTest(Protocol protocol) {
cache = new Cache(tempDir.getRoot(), Integer.MAX_VALUE);
client = new OkHttpClient.Builder()
.protocols(Arrays.asList(protocol, Protocol.HTTP_1_1))
.dns(new SingleInetAddressDns())
.sslSocketFactory(sslContext.getSocketFactory())
.hostnameVerifier(hostnameVerifier)
.build();
Expand Down Expand Up @@ -582,6 +585,99 @@ protected HttpOverSpdyTest(Protocol protocol) {
assertEquals(0, server.takeRequest().getSequenceNumber());
}

@Test public void recoverFromOneRefusedStreamReusesConnection() throws Exception {
server.enqueue(new MockResponse()
.setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START)
.setHttp2ErrorCode(ErrorCode.REFUSED_STREAM.httpCode));
server.enqueue(new MockResponse()
.setBody("abc"));

Call call = client.newCall(new Request.Builder()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you use the SingleInetAddressDns here? I was testing with nginx 1.9.15 and I noticed the refuse stream stack trace. I think it's because I was using the IP address directly, but still investigating.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You saw something I didn’t see! I turned that on and of course the test started to fail. I fixed it by changing hasMoreRoutes() to return true if the current route is still valid.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, gotcha. I couldn't get those tests to run in intellij, but just got it working by upgrading my JDK version.

.url(server.url("/"))
.build());
Response response = call.execute();
assertEquals("abc", response.body().string());

assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
assertEquals(1, server.takeRequest().getSequenceNumber()); // Reused connection.
}

@Test public void recoverFromOneInternalErrorRequiresNewConnection() throws Exception {
server.enqueue(new MockResponse()
.setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START)
.setHttp2ErrorCode(ErrorCode.INTERNAL_ERROR.httpCode));
server.enqueue(new MockResponse()
.setBody("abc"));

client = client.newBuilder()
.dns(new DoubleInetAddressDns())
.build();

Call call = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
Response response = call.execute();
assertEquals("abc", response.body().string());

assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
}

@Test public void recoverFromMultipleRefusedStreamsRequiresNewConnection() throws Exception {
server.enqueue(new MockResponse()
.setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START)
.setHttp2ErrorCode(ErrorCode.REFUSED_STREAM.httpCode));
server.enqueue(new MockResponse()
.setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START)
.setHttp2ErrorCode(ErrorCode.REFUSED_STREAM.httpCode));
server.enqueue(new MockResponse()
.setBody("abc"));

client = client.newBuilder()
.dns(new DoubleInetAddressDns())
.build();

Call call = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
Response response = call.execute();
assertEquals("abc", response.body().string());

assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
assertEquals(1, server.takeRequest().getSequenceNumber()); // Reused connection.
assertEquals(0, server.takeRequest().getSequenceNumber()); // New connection.
}

@Test public void noRecoveryFromRefusedStreamWithRetryDisabled() throws Exception {
noRecoveryFromErrorWithRetryDisabled(ErrorCode.REFUSED_STREAM);
}

@Test public void noRecoveryFromInternalErrorWithRetryDisabled() throws Exception {
noRecoveryFromErrorWithRetryDisabled(ErrorCode.INTERNAL_ERROR);
}

private void noRecoveryFromErrorWithRetryDisabled(ErrorCode errorCode) throws Exception {
server.enqueue(new MockResponse()
.setSocketPolicy(SocketPolicy.RESET_STREAM_AT_START)
.setHttp2ErrorCode(errorCode.httpCode));
server.enqueue(new MockResponse()
.setBody("abc"));

client = client.newBuilder()
.retryOnConnectionFailure(false)
.build();

Call call = client.newCall(new Request.Builder()
.url(server.url("/"))
.build());
try {
call.execute();
fail();
} catch (StreamResetException expected) {
assertEquals(errorCode, expected.errorCode);
}
}

public Buffer gzip(String bytes) throws IOException {
Buffer bytesOut = new Buffer();
BufferedSink sink = Okio.buffer(new GzipSink(bytesOut));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public synchronized List<Header> getResponseHeaders() throws IOException {
readTimeout.exitAndThrowIfTimedOut();
}
if (responseHeaders != null) return responseHeaders;
throw new IOException("stream was reset: " + errorCode);
throw new StreamResetException(errorCode);
}

/**
Expand Down Expand Up @@ -438,7 +438,7 @@ private void checkNotClosed() throws IOException {
throw new IOException("stream closed");
}
if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
throw new StreamResetException(errorCode);
}
}
}
Expand Down Expand Up @@ -571,7 +571,7 @@ private void checkOutNotClosed() throws IOException {
} else if (sink.finished) {
throw new IOException("stream finished");
} else if (errorCode != null) {
throw new IOException("stream was reset: " + errorCode);
throw new StreamResetException(errorCode);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3.internal.framed;

import java.io.IOException;

/** Thrown when an HTTP/2 stream is canceled without damage to the socket that carries it. */
public final class StreamResetException extends IOException {
public final ErrorCode errorCode;

public StreamResetException(ErrorCode errorCode) {
super("stream was reset: " + errorCode);
this.errorCode = errorCode;
}
}
57 changes: 52 additions & 5 deletions okhttp/src/main/java/okhttp3/internal/http/HttpEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
package okhttp3.internal.http;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ProtocolException;
import java.net.Proxy;
import java.net.SocketTimeoutException;
import java.security.cert.CertificateException;
import java.util.Date;
import java.util.List;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSocketFactory;
import okhttp3.Address;
import okhttp3.CertificatePinner;
Expand Down Expand Up @@ -352,12 +357,22 @@ public Connection getConnection() {
* permanent. Requests with a body can only be recovered if the body is buffered.
*/
public HttpEngine recover(IOException e, boolean routeException, Sink requestBodyOut) {
if (!streamAllocation.recover(e, routeException, requestBodyOut)) {
return null;
}
streamAllocation.streamFailed(e);

if (!client.retryOnConnectionFailure()) {
return null;
return null; // The application layer has forbidden retries.
}

if (requestBodyOut != null && !(requestBodyOut instanceof RetryableSink)) {
return null; // The body on this request cannot be retried.
}

if (!isRecoverable(e, routeException)) {
return null; // This exception is fatal.
}

if (!streamAllocation.hasMoreRoutes()) {
return null; // No more routes to attempt.
}

StreamAllocation streamAllocation = close();
Expand All @@ -371,6 +386,38 @@ public HttpEngine recover(IOException e, boolean routeException) {
return recover(e, routeException, requestBodyOut);
}

private boolean isRecoverable(IOException e, boolean routeException) {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This code is just moved)

// If there was a protocol problem, don't recover.
if (e instanceof ProtocolException) {
return false;
}

// If there was an interruption don't recover, but if there was a timeout connecting to a route
// we should try the next route (if there is one).
if (e instanceof InterruptedIOException) {
return e instanceof SocketTimeoutException && routeException;
}

// Look for known client-side or negotiation errors that are unlikely to be fixed by trying
// again with a different route.
if (e instanceof SSLHandshakeException) {
// If the problem was a CertificateException from the X509TrustManager,
// do not retry.
if (e.getCause() instanceof CertificateException) {
return false;
}
}
if (e instanceof SSLPeerUnverifiedException) {
// e.g. a certificate pinning error.
return false;
}

// An example of one we might want to retry with a different route is a problem connecting to a
// proxy and would manifest as a standard IOException. Unless it is one we know we should not
// retry, we return true and try a new route.
return true;
}

private void maybeCache() throws IOException {
InternalCache responseCache = Internal.instance.internalCache(client);
if (responseCache == null) return;
Expand Down Expand Up @@ -428,7 +475,7 @@ public StreamAllocation close() {
closeQuietly(userResponse.body());
} else {
// If this engine never achieved a response body, its stream allocation is dead.
streamAllocation.connectionFailed(null);
streamAllocation.streamFailed(null);
}

return streamAllocation;
Expand Down
Loading