Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ dependencies {
implementation 'org.slf4j:slf4j-api:2.0.16'
testImplementation 'org.junit.jupiter:junit-jupiter:5.11.4'
testImplementation 'org.junit.vintage:junit-vintage-engine:5.11.4'
testImplementation 'org.mockito:mockito-core:5.14.2'
testImplementation 'org.mockito:mockito-junit-jupiter:5.14.2'
testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.4.1'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testRuntimeOnly 'org.slf4j:jcl-over-slf4j:2.0.16'
Expand Down
97 changes: 86 additions & 11 deletions src/main/java/com/emc/object/AbstractJerseyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
package com.emc.object;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Map;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
Expand All @@ -42,13 +44,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.emc.object.s3.S3Exception;
import com.emc.object.util.RestUtil;
import com.emc.rest.smart.jersey.SizeOverrideWriter;

public abstract class AbstractJerseyClient {

private static final Logger log = LoggerFactory.getLogger(AbstractJerseyClient.class);

public static final String PROP_RETRY_COUNT = "com.emc.object.retryCount";

protected ObjectConfig objectConfig;

protected AbstractJerseyClient(ObjectConfig objectConfig) {
Expand All @@ -62,21 +67,42 @@ protected Response executeAndClose(Client client, ObjectRequest request) {
}

/**
* Override in subclasses that support retry to return a configured {@link com.emc.object.s3.jersey.RetryFilter}.
* Default is {@code null} (no retry).
* Override in subclasses that support retry. Default is {@code false} (no retry).
*/
protected boolean isRetryEnabled() {
return false;
}

/**
* Override in subclasses to provide the maximum number of retries.
*/
protected int getRetryLimit() {
return 0;
}

/**
* Override in subclasses to provide the initial retry delay in milliseconds.
* Exponential backoff is applied: delay = initialRetryDelay * 2^(retryCount-1).
*/
protected int getInitialRetryDelay() {
return 0;
}

/**
* Override in subclasses to provide the buffer size for marking input streams during retry.
*/
protected com.emc.object.s3.jersey.RetryFilter getRetryFilter() {
return null;
protected int getRetryBufferSize() {
return 0;
}

@SuppressWarnings("unchecked")
protected Response executeRequest(Client client, ObjectRequest request) {
com.emc.object.s3.jersey.RetryFilter retryFilter = getRetryFilter();
boolean retryEnabled = isRetryEnabled();
InputStream entityStream = null;
if (retryFilter != null && request instanceof EntityRequest) {
if (retryEnabled && request instanceof EntityRequest) {
Object entity = ((EntityRequest) request).getEntity();
if (entity instanceof InputStream) {
int bufSize = retryFilter.getRetryBufferSize();
int bufSize = getRetryBufferSize();
InputStream is = (InputStream) entity;
InputStream buffered = is.markSupported() ? is : new BufferedInputStream(is, bufSize);
buffered.mark(bufSize);
Expand All @@ -88,14 +114,63 @@ protected Response executeRequest(Client client, ObjectRequest request) {
try {
return unwrapAndExecute(client, request);
} catch (RuntimeException orig) {
if (retryFilter == null) throw orig;
if (!retryEnabled) throw orig;
retryCount++;
// stash retry count so GeoPinningFilter can fail over on reads
request.property(com.emc.object.s3.jersey.RetryFilter.PROP_RETRY_COUNT, retryCount);
// shouldRetry throws the original exception if retries are exhausted or not retryable
retryFilter.shouldRetry(orig, retryCount, entityStream);
request.property(PROP_RETRY_COUNT, retryCount);
// checkRetry throws the original exception if retries are exhausted or not retryable
checkRetry(orig, retryCount, entityStream);
}
}
}

/**
* Checks whether the given exception is retryable and handles retry logic including
* exponential backoff. Throws the original exception if retries are exhausted or the
* error is not retryable.
*/
private void checkRetry(RuntimeException orig, int retryCount, InputStream entityStream) {
Throwable t = orig;

// in this case, the exception was wrapped by Jersey
if (t instanceof ProcessingException) t = t.getCause();

if (t instanceof S3Exception) {
S3Exception se = (S3Exception) t;

// retry all 50x errors except 501 (not implemented)
if (se.getHttpCode() < 500 || se.getHttpCode() == 501) throw orig;

// retry all IO exceptions
} else if (!(t instanceof IOException)) throw orig;

// only retry retryLimit times
if (retryCount > getRetryLimit()) throw orig;

// attempt to reset InputStream
if (entityStream != null) {
try {
if (!entityStream.markSupported()) throw new IOException("stream does not support mark/reset");
entityStream.reset();
} catch (IOException e) {
log.warn("could not reset entity stream for retry: " + e);
throw orig;
}
}

// wait for retry delay with exponential backoff
int initialDelay = getInitialRetryDelay();
if (initialDelay > 0) {
int retryDelay = initialDelay * (int) Math.pow(2, retryCount - 1);
try {
log.debug("waiting {}ms before retry", retryDelay);
Thread.sleep(retryDelay);
} catch (InterruptedException e) {
log.warn("interrupted while waiting to retry: " + e.getMessage());
}
}

log.info("error received in response [{}], retrying ({} of {})...", new Object[] { t, retryCount, getRetryLimit() });
}

private Response unwrapAndExecute(Client client, ObjectRequest request) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/emc/object/s3/S3Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public Host resolveHost() {

@Override
public SmartConfig toSmartConfig() {
// disable Apache client's retry - retries will be governed by our config and RetryFilter
// disable Apache client's retry - retries will be governed by our config and AbstractJerseyClient
return super.toSmartConfig().withProperty(SmartClientFactory.DISABLE_APACHE_RETRY, Boolean.TRUE);
}

Expand Down
61 changes: 22 additions & 39 deletions src/main/java/com/emc/object/s3/jersey/CodecFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
*/
package com.emc.object.s3.jersey;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
Expand Down Expand Up @@ -92,46 +91,30 @@ public void aroundWriteTo(WriterInterceptorContext context) throws IOException {
Map<String, String> metaBackup = new HashMap<>(userMeta);
context.setProperty("com.emc.object.codecFilter.metaBackup", metaBackup);

// NOTE: do NOT add encode metadata to context.getHeaders() here.
// With HttpUrlConnectorProvider, Jersey's CommittingOutputStream defers the
// setOutboundHeaders() callback until the first byte is written. If we add
// x-amz-meta-* headers to context.getHeaders() now, they will be copied to
// the HttpURLConnection and sent to the server, even though they were not
// present when the request was signed — causing a V2/V4 signature mismatch.
// The metadata stored in userMeta is sent via the subsequent CopyObject
// metadata-update request instead (see S3EncryptionClient.putObject).

// connect the dangling stream and wrap the output.
// Wrap with a stream that strips Content-Length before the first write.
// SizeOverrideWriter.writeTo() adds Content-Length to context.getHeaders()
// (possibly -1 for unpredictable sizes). With allowRestrictedHeaders=true
// (needed for V4 Host header), this leaks through HttpURLConnection's
// setOutboundHeaders alongside Transfer-Encoding: chunked, which is invalid
// and can cause server-side issues. Stripping it here ensures only chunked
// encoding is used for encrypted uploads.
OutputStream originalOut = context.getOutputStream();
final MultivaluedMap<String, Object> headers = context.getHeaders();
OutputStream safeOut = new FilterOutputStream(originalOut) {
private boolean cleaned = false;
@Override
public void write(int b) throws IOException {
stripContentLength();
out.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
stripContentLength();
out.write(b, off, len);
}
private void stripContentLength() {
if (!cleaned) {
cleaned = true;
headers.remove("Content-Length");
}
}
};
OutputStream encodeStream = encodeChain.getEncodeStream(originalOut, userMeta);

// add pre-stream encode metadata (IV, DEK, etc.) to the outbound headers.
// This ensures the initial PUT has encryption headers as a safety net: if the
// write succeeds but the subsequent copy-update fails, the object is still
// decryptable.
context.getHeaders().putAll(S3ObjectMetadata.getUmdHeaders(userMeta));

// re-sign: the encode metadata headers (x-amz-meta-*) are part of the V2/V4
// signed canonical headers, so the signature computed by AuthorizationFilter
// is now stale. Use the stashed signer to recompute (same pattern as ChecksumFilter).
S3Signer stashedSigner = (S3Signer) context.getProperty(AuthorizationFilter.PROP_SIGNER);
if (stashedSigner != null) {
String method = (String) context.getProperty(AuthorizationFilter.PROP_SIGN_METHOD);
URI uri = (URI) context.getProperty(AuthorizationFilter.PROP_SIGN_URI);
String resource = (String) context.getProperty(AuthorizationFilter.PROP_SIGN_RESOURCE);
@SuppressWarnings("unchecked")
Map<String, String> parameters = (Map<String, String>) context.getProperty(AuthorizationFilter.PROP_SIGN_PARAMETERS);
@SuppressWarnings({"unchecked", "rawtypes"})
Map<String, List<Object>> signingHeaders = (Map) context.getHeaders();
stashedSigner.resign(method, uri, resource, parameters, signingHeaders);
}

OutputStream encodeStream = encodeChain.getEncodeStream(safeOut, userMeta);
context.setOutputStream(encodeStream);

try {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/emc/object/s3/jersey/GeoPinningFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.emc.object.AbstractJerseyClient;
import com.emc.object.Method;
import com.emc.object.ObjectConfig;
import com.emc.object.s3.S3Constants;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void filter(ClientRequestContext requestContext) throws IOException {

// if this is a read and failover for retries is requested, round-robin the VDCs for each retry
if (objectConfig.isGeoReadRetryFailover() && Method.GET.name().equalsIgnoreCase(requestContext.getMethod())) {
Integer retries = (Integer) requestContext.getProperty(RetryFilter.PROP_RETRY_COUNT);
Integer retries = (Integer) requestContext.getProperty(AbstractJerseyClient.PROP_RETRY_COUNT);
if (retries != null) {
int newIndex = (geoPinIndex + retries) % healthyVdcs.size();
log.info("geo-pin read retry #{}: failing over from primary VDC {} to VDC {}",
Expand Down
107 changes: 0 additions & 107 deletions src/main/java/com/emc/object/s3/jersey/RetryFilter.java

This file was deleted.

Loading