Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,4 @@ build
*.iws
dependency-reduced-pom.xml
.gradle
# Local workspace files (not part of the repo)
CLIENT_DEMO.md
*.pptx
src/test/java/com/emc/object/demo/
video_tools/
test.properties
release-notes-*.md
7 changes: 0 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,6 @@ clean {
delete aggregatedDocsDir
}

task runDemo(type: JavaExec, dependsOn: testClasses) {
group = 'Demo'
description = 'Runs EcsClientDemo against a live ECS cluster (configure via test.properties)'
classpath = sourceSets.test.runtimeClasspath
mainClass = 'com.emc.object.demo.EcsClientDemo'
}

// allow typing in credentials
// note: this only works when run without the Gradle daemon (--no-daemon)
gradle.taskGraph.whenReady { taskGraph ->
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/com/emc/object/AbstractJerseyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ protected Response executeRequest(Client client, ObjectRequest request) {
InputStream is = (InputStream) entity;
InputStream buffered = is.markSupported() ? is : new BufferedInputStream(is, bufSize);
buffered.mark(bufSize);
try {
((EntityRequest) request).setEntity(buffered);
} catch (UnsupportedOperationException ignored) {
}
entityStream = buffered;
}
}
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/com/emc/object/EntityRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,4 @@ public interface EntityRequest {
Long getContentLength();

boolean isChunkable();

default void setEntity(Object entity) {
throw new UnsupportedOperationException("setEntity not supported on this request type");
}
}
7 changes: 2 additions & 5 deletions src/main/java/com/emc/object/s3/S3SignerV4.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,8 @@ protected SortedMap<String, String> getCanonicalizedHeaders(Map<String, List<Obj
for (String header : headers.keySet()) {
String lcHeader = header.toLowerCase();
// Only sign headers that are guaranteed to arrive at the server unchanged.
// HTTP connectors (e.g. HttpUrlConnectorProvider) may modify standard headers
// such as Content-Length, User-Agent, Accept, and Transfer-Encoding after
// signing, which causes V4 signature mismatches. Following AWS SDK conventions,
// we sign: host (required), content-type, content-md5, and all x-amz-/x-emc-
// prefixed headers.
// Following AWS SDK conventions, we sign: host (required), content-type,
// content-md5, and all x-amz-/x-emc- prefixed headers.
if (lcHeader.equals("host") || lcHeader.equals("content-type") || lcHeader.equals("content-md5")
|| lcHeader.startsWith(S3Constants.AMZ_PREFIX) || lcHeader.startsWith(RestUtil.EMC_PREFIX)) {
canonicalizedHeaders.put(lcHeader, trimAndJoin(headers.get(header), ","));
Expand Down
7 changes: 0 additions & 7 deletions src/main/java/com/emc/object/s3/jersey/ChecksumFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@

import com.emc.object.s3.S3Config;
import com.emc.object.s3.S3Signer;
import com.emc.object.s3.S3SignerV2;
import com.emc.object.s3.S3SignerV4;
import com.emc.object.util.ChecksumAlgorithm;
import com.emc.object.util.ChecksumError;
import com.emc.object.util.ChecksumValueImpl;
Expand All @@ -56,14 +54,9 @@ public class ChecksumFilter implements WriterInterceptor, ClientResponseFilter {
private static final String PROP_WRITE_CHECKSUM = "com.emc.object.checksumFilter.writeChecksum";

private S3Config s3Config;
private S3Signer signer;

public ChecksumFilter(S3Config s3Config) {
this.s3Config = s3Config;
if(s3Config.isUseV2Signer())
this.signer = new S3SignerV2(s3Config);
else
this.signer = new S3SignerV4(s3Config);
}

@Override
Expand Down
42 changes: 7 additions & 35 deletions src/main/java/com/emc/object/s3/jersey/CodecFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -48,6 +50,7 @@

import com.emc.codec.CodecChain;
import com.emc.object.s3.S3ObjectMetadata;
import com.emc.object.s3.S3Signer;
import com.emc.object.util.RestUtil;
import com.emc.rest.smart.jersey.SizeOverrideWriter;

Expand Down Expand Up @@ -89,11 +92,6 @@ public void aroundWriteTo(WriterInterceptorContext context) throws IOException {
Map<String, String> metaBackup = new HashMap<>(userMeta);
context.setProperty("com.emc.object.codecFilter.metaBackup", metaBackup);

// we need pre-stream metadata from the encoder, but we don't have the entity output stream, so we'll use
// a "dangling" output stream and connect it in the interceptor
DanglingOutputStream danglingStream = new DanglingOutputStream();
OutputStream encodeStream = encodeChain.getEncodeStream(danglingStream, userMeta);

// NOTE: do NOT add encode metadata to context.getHeaders() here.
// With HttpUrlConnectorProvider, Jersey's CommittingOutputStream defers the
// setOutboundHeaders() callback until the first byte is written. If we add
Expand Down Expand Up @@ -132,7 +130,8 @@ private void stripContentLength() {
}
}
};
danglingStream.setOutputStream(safeOut);

OutputStream encodeStream = encodeChain.getEncodeStream(safeOut, userMeta);
context.setOutputStream(encodeStream);

try {
Expand All @@ -146,10 +145,9 @@ private void stripContentLength() {
// make sure we clear the content-length override for this thread if we set it
SizeOverrideWriter.setEntitySize(null);
}
return;
} else {
context.proceed();
}

context.proceed();
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -222,30 +220,4 @@ public CodecFilter withCodecProperties(Map<String, Object> codecProperties) {
return this;
}

private static class DanglingOutputStream extends FilterOutputStream {
private static final OutputStream BOGUS_STREAM = new OutputStream() {
@Override
public void write(int b) throws IOException {
throw new RuntimeException("you didn't connect a dangling output stream!");
}
};

DanglingOutputStream() {
super(BOGUS_STREAM);
}

void setOutputStream(OutputStream out) {
this.out = out;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}

@Override
public void write(int b) throws IOException {
throw new UnsupportedOperationException("single-byte write called!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import java.net.URL;
import java.util.Map;

import org.glassfish.jersey.client.spi.ConnectorProvider;

import com.emc.codec.CodecChain;
import com.emc.codec.encryption.DoesNotNeedRekeyException;
import com.emc.codec.encryption.EncryptionCodec;
Expand Down Expand Up @@ -123,11 +121,7 @@ public class S3EncryptionClient extends S3JerseyClient {
private EncryptionConfig encryptionConfig;

public S3EncryptionClient(S3Config s3Config, EncryptionConfig encryptionConfig) {
this(s3Config, null, encryptionConfig);
}

public S3EncryptionClient(S3Config s3Config, ConnectorProvider connectorProvider, EncryptionConfig encryptionConfig) {
super(s3Config, connectorProvider);
super(s3Config);
this.encryptionConfig = encryptionConfig;

// create an encode chain based on parameters
Expand Down
41 changes: 3 additions & 38 deletions src/main/java/com/emc/object/s3/jersey/S3JerseyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import com.emc.rest.smart.jersey.SmartClientFactory;
import com.emc.rest.smart.jersey.SmartFilter;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.HttpUrlConnectorProvider;
import org.glassfish.jersey.client.spi.ConnectorProvider;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
Expand Down Expand Up @@ -117,13 +115,7 @@
* <pre>
* System.setProperty(ReaderWriter.BUFFER_SIZE_SYSTEM_PROPERTY, "" + 128 * 1024); // 128k
* </pre>
* You can also try using Jersey's HttpUrlConnectorProvider, but be aware that this connector does not support
* <code>Expect: 100-Continue</code> behavior if that is important to you. You should also increase
* <code>http.maxConnections</code> to match your thread count.
* <pre>
* System.setProperty("http.maxConnections", "" + 32); // if you have 32 threads
* S3Client s3Client = new S3JerseyClient(configX, new HttpUrlConnectorProvider());
* </pre>
* This client always uses the Apache HTTP connector.
*/
public class S3JerseyClient extends AbstractJerseyClient implements S3Client {

Expand All @@ -136,26 +128,7 @@ public class S3JerseyClient extends AbstractJerseyClient implements S3Client {
protected RetryFilter retryFilter;

public S3JerseyClient(S3Config s3Config) {
this(s3Config, null);
}

/**
* Provide a specific Jersey {@link ConnectorProvider} implementation (default is
* {@link org.glassfish.jersey.apache.connector.ApacheConnectorProvider}). If you experience
* performance problems, you might try using
* {@link org.glassfish.jersey.client.HttpUrlConnectorProvider}, but note that it will not support the
* Expect: 100-Continue header and upload size is limited to 2GB. Also note that when using that
* provider, you should set the "http.maxConnections" system property to match your thread count
* (default is only 5).
*/
public S3JerseyClient(S3Config s3Config, ConnectorProvider connectorProvider) {
super(new S3Config(s3Config)); // deep-copy config so that two clients don't share the same host lists (SDK-122)
// HttpURLConnection restricts certain headers (Host, Content-Length, etc.) by default.
// V4 signing requires the Host header to pass through unchanged, so we must allow
// restricted headers when using HttpUrlConnectorProvider.
if (connectorProvider instanceof HttpUrlConnectorProvider) {
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
}
this.s3Config = (S3Config) super.getObjectConfig();
if (this.s3Config.isUseV2Signer())
this.signer = new S3SignerV2(this.s3Config);
Expand All @@ -169,11 +142,7 @@ public S3JerseyClient(S3Config s3Config, ConnectorProvider connectorProvider) {
smartConfig.setProperty(ClientProperties.CHUNKED_ENCODING_SIZE, this.s3Config.getChunkedEncodingSize());

// creates a standard (non-load-balancing) jersey client
if (connectorProvider == null) {
client = SmartClientFactory.createStandardClient(smartConfig);
} else {
client = SmartClientFactory.createStandardClient(smartConfig, connectorProvider);
}
client = SmartClientFactory.createStandardClient(smartConfig);

if (this.s3Config.isSmartClient()) {
// SMART CLIENT SETUP
Expand Down Expand Up @@ -208,11 +177,7 @@ public S3JerseyClient(S3Config s3Config, ConnectorProvider connectorProvider) {

// S.C. - CLIENT CREATION
// create a load-balancing jersey client
if (connectorProvider == null) {
client = SmartClientFactory.createSmartClient(smartConfig);
} else {
client = SmartClientFactory.createSmartClient(smartConfig, connectorProvider);
}
client = SmartClientFactory.createSmartClient(smartConfig);
}

// In Jersey 2.x, filters are registered on the client (order matters for request filters:
Expand Down
9 changes: 0 additions & 9 deletions src/main/java/com/emc/object/s3/request/PutObjectRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,6 @@ public Object getObject() {
return object;
}

public void setObject(Object object) {
this.object = object;
}

@Override
public void setEntity(Object entity) {
setObject(entity);
}

public Range getRange() {
return range;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,6 @@ public void setObject(Object object) {
this.object = object;
}

@Override
public void setEntity(Object entity) {
setObject(entity);
}

public void setContentLength(Long contentLength) {
this.contentLength = contentLength;
}
Expand Down
13 changes: 7 additions & 6 deletions src/test/java/com/emc/object/s3/ExtendedConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import javax.ws.rs.client.Client;

import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -46,25 +47,25 @@ private S3Config loadTestConfig() throws IOException {
return s3Config;
}

// NOTE: In Jersey 2.x with Apache connector, connection pool settings are configured
// through SmartClientFactory which sets up the Apache HttpClient 5.x connection manager.
// This test verifies that a custom connection limit can be set via SmartConfig properties.
@Test
public void testApacheConnectionLimit() throws IOException {
S3Config s3Config = loadTestConfig();

int connectionLimitPerHost = 4; // non-default number
int connectionLimitTotal = 39; // non-default number

// In Jersey 2.x, connection limits are set via SmartConfig properties
s3Config.setProperty(SmartClientFactory.MAX_CONNECTIONS_PER_HOST, connectionLimitPerHost);
s3Config.setProperty(SmartClientFactory.MAX_CONNECTIONS, connectionLimitTotal);

TestS3JerseyClient s3Client = new TestS3JerseyClient(s3Config);

// verify the client was created successfully with custom config
// verify actual Apache connection pool settings were applied
Client jerseyClient = s3Client.getClient();
Assert.assertNotNull(jerseyClient);
PoolingHttpClientConnectionManager cm = (PoolingHttpClientConnectionManager)
jerseyClient.getConfiguration().getProperty(SmartClientFactory.CONNECTION_MANAGER_PROPERTY_KEY);
Assert.assertNotNull("Apache connection manager not found in client config", cm);
Assert.assertEquals(connectionLimitPerHost, cm.getDefaultMaxPerRoute());
Assert.assertEquals(connectionLimitTotal, cm.getMaxTotal());
}

static class TestS3JerseyClient extends S3JerseyClient {
Expand Down
46 changes: 30 additions & 16 deletions src/test/java/com/emc/object/s3/GeoPinningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,28 +148,42 @@ public void testVdcDistribution() {

@Test
public void testReadRetryFailoverInFilter() throws Exception {
S3Config s3ConfigF = new S3Config(createS3Config());
// create a separate client with geo-read-retry-failover enabled
S3Config s3ConfigF = createS3Config();
s3ConfigF.setGeoReadRetryFailover(true);
S3JerseyClient failoverClient = new S3JerseyClient(s3ConfigF);
Thread.sleep(500); // wait for polling daemon

String bucket = "foo";
String key = "my/object/key";
int geoIndex = 0xbb8619 % vdcs.size();
try {
String key = "my/object/key";
int geoIndex = 0xbb8619 % vdcs.size();

// In Jersey 2.x, we test geo-pinning index calculation directly
// since we can't easily construct mock ClientRequestContext
int geoPinIndex = GeoPinningUtil.getGeoPinIndex(GeoPinningUtil.getGeoId(bucket, key), vdcs.size());
Assert.assertEquals(geoIndex, geoPinIndex);
// write the test object
failoverClient.putObject(getTestBucket(), key, "Hello GeoPinning!", "text/plain");

// test retry failover indices
int retryIndex1 = (geoIndex + 1) % vdcs.size();
Assert.assertNotEquals(geoIndex, retryIndex1);
LoadBalancer loadBalancer = failoverClient.getLoadBalancer();
loadBalancer.resetStats();

int retryIndex2 = (geoIndex + 2) % vdcs.size();
Assert.assertNotEquals(geoIndex, retryIndex2);
// read the object 10 times — should all route to the geo-pinned VDC
for (int i = 0; i < 10; i++) {
failoverClient.readObject(getTestBucket(), key, String.class);
}

// test 3rd retry (we have 3 VDCs, so this should go back to the primary)
int retryIndex3 = (geoIndex + 3) % vdcs.size();
Assert.assertEquals(geoIndex, retryIndex3);
// verify no errors and total count
Assert.assertEquals(0, loadBalancer.getTotalErrors());
Assert.assertEquals(10, loadBalancer.getTotalConnections());

// verify reads are routed to the correct VDC
for (HostStats stats : loadBalancer.getHostStats()) {
if (vdcs.get(geoIndex).equals(((VdcHost) stats).getVdc())) {
Assert.assertTrue(stats.getTotalConnections() > 0);
} else {
Assert.assertEquals(0, stats.getTotalConnections());
}
}
} finally {
failoverClient.destroy();
}
}

protected void testKeyDistribution(String key, int vdcIndex) {
Expand Down
Loading