diff --git a/build.gradle b/build.gradle index 59d8eaf3..79899500 100644 --- a/build.gradle +++ b/build.gradle @@ -86,6 +86,8 @@ dependencies { implementation 'org.slf4j:slf4j-api:2.0.16' testImplementation 'org.junit.jupiter:junit-jupiter:5.11.4' testImplementation 'org.junit.vintage:junit-vintage-engine:5.11.4' + testImplementation 'org.mockito:mockito-core:5.14.2' + testImplementation 'org.mockito:mockito-junit-jupiter:5.14.2' testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.4.1' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testRuntimeOnly 'org.slf4j:jcl-over-slf4j:2.0.16' diff --git a/src/main/java/com/emc/object/AbstractJerseyClient.java b/src/main/java/com/emc/object/AbstractJerseyClient.java index 6699ec86..bd83cf0b 100644 --- a/src/main/java/com/emc/object/AbstractJerseyClient.java +++ b/src/main/java/com/emc/object/AbstractJerseyClient.java @@ -27,10 +27,12 @@ package com.emc.object; import java.io.BufferedInputStream; +import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.Map; +import javax.ws.rs.ProcessingException; import javax.ws.rs.client.Client; import javax.ws.rs.client.Entity; import javax.ws.rs.client.Invocation; @@ -42,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.emc.object.s3.S3Exception; import com.emc.object.util.RestUtil; import com.emc.rest.smart.jersey.SizeOverrideWriter; @@ -49,6 +52,8 @@ public abstract class AbstractJerseyClient { private static final Logger log = LoggerFactory.getLogger(AbstractJerseyClient.class); + public static final String PROP_RETRY_COUNT = "com.emc.object.retryCount"; + protected ObjectConfig objectConfig; protected AbstractJerseyClient(ObjectConfig objectConfig) { @@ -62,21 +67,42 @@ protected Response executeAndClose(Client client, ObjectRequest request) { } /** - * Override in subclasses that support retry to return a configured {@link com.emc.object.s3.jersey.RetryFilter}. - * Default is {@code null} (no retry). + * Override in subclasses that support retry. Default is {@code false} (no retry). + */ + protected boolean isRetryEnabled() { + return false; + } + + /** + * Override in subclasses to provide the maximum number of retries. + */ + protected int getRetryLimit() { + return 0; + } + + /** + * Override in subclasses to provide the initial retry delay in milliseconds. + * Exponential backoff is applied: delay = initialRetryDelay * 2^(retryCount-1). + */ + protected int getInitialRetryDelay() { + return 0; + } + + /** + * Override in subclasses to provide the buffer size for marking input streams during retry. */ - protected com.emc.object.s3.jersey.RetryFilter getRetryFilter() { - return null; + protected int getRetryBufferSize() { + return 0; } @SuppressWarnings("unchecked") protected Response executeRequest(Client client, ObjectRequest request) { - com.emc.object.s3.jersey.RetryFilter retryFilter = getRetryFilter(); + boolean retryEnabled = isRetryEnabled(); InputStream entityStream = null; - if (retryFilter != null && request instanceof EntityRequest) { + if (retryEnabled && request instanceof EntityRequest) { Object entity = ((EntityRequest) request).getEntity(); if (entity instanceof InputStream) { - int bufSize = retryFilter.getRetryBufferSize(); + int bufSize = getRetryBufferSize(); InputStream is = (InputStream) entity; InputStream buffered = is.markSupported() ? is : new BufferedInputStream(is, bufSize); buffered.mark(bufSize); @@ -88,14 +114,63 @@ protected Response executeRequest(Client client, ObjectRequest request) { try { return unwrapAndExecute(client, request); } catch (RuntimeException orig) { - if (retryFilter == null) throw orig; + if (!retryEnabled) throw orig; retryCount++; // stash retry count so GeoPinningFilter can fail over on reads - request.property(com.emc.object.s3.jersey.RetryFilter.PROP_RETRY_COUNT, retryCount); - // shouldRetry throws the original exception if retries are exhausted or not retryable - retryFilter.shouldRetry(orig, retryCount, entityStream); + request.property(PROP_RETRY_COUNT, retryCount); + // checkRetry throws the original exception if retries are exhausted or not retryable + checkRetry(orig, retryCount, entityStream); + } + } + } + + /** + * Checks whether the given exception is retryable and handles retry logic including + * exponential backoff. Throws the original exception if retries are exhausted or the + * error is not retryable. + */ + private void checkRetry(RuntimeException orig, int retryCount, InputStream entityStream) { + Throwable t = orig; + + // in this case, the exception was wrapped by Jersey + if (t instanceof ProcessingException) t = t.getCause(); + + if (t instanceof S3Exception) { + S3Exception se = (S3Exception) t; + + // retry all 50x errors except 501 (not implemented) + if (se.getHttpCode() < 500 || se.getHttpCode() == 501) throw orig; + + // retry all IO exceptions + } else if (!(t instanceof IOException)) throw orig; + + // only retry retryLimit times + if (retryCount > getRetryLimit()) throw orig; + + // attempt to reset InputStream + if (entityStream != null) { + try { + if (!entityStream.markSupported()) throw new IOException("stream does not support mark/reset"); + entityStream.reset(); + } catch (IOException e) { + log.warn("could not reset entity stream for retry: " + e); + throw orig; + } + } + + // wait for retry delay with exponential backoff + int initialDelay = getInitialRetryDelay(); + if (initialDelay > 0) { + int retryDelay = initialDelay * (int) Math.pow(2, retryCount - 1); + try { + log.debug("waiting {}ms before retry", retryDelay); + Thread.sleep(retryDelay); + } catch (InterruptedException e) { + log.warn("interrupted while waiting to retry: " + e.getMessage()); } } + + log.info("error received in response [{}], retrying ({} of {})...", new Object[] { t, retryCount, getRetryLimit() }); } private Response unwrapAndExecute(Client client, ObjectRequest request) { diff --git a/src/main/java/com/emc/object/s3/S3Config.java b/src/main/java/com/emc/object/s3/S3Config.java index 34fdcc1b..735804cb 100644 --- a/src/main/java/com/emc/object/s3/S3Config.java +++ b/src/main/java/com/emc/object/s3/S3Config.java @@ -140,7 +140,7 @@ public Host resolveHost() { @Override public SmartConfig toSmartConfig() { - // disable Apache client's retry - retries will be governed by our config and RetryFilter + // disable Apache client's retry - retries will be governed by our config and AbstractJerseyClient return super.toSmartConfig().withProperty(SmartClientFactory.DISABLE_APACHE_RETRY, Boolean.TRUE); } diff --git a/src/main/java/com/emc/object/s3/jersey/CodecFilter.java b/src/main/java/com/emc/object/s3/jersey/CodecFilter.java index 7bb4c995..9b28e38f 100644 --- a/src/main/java/com/emc/object/s3/jersey/CodecFilter.java +++ b/src/main/java/com/emc/object/s3/jersey/CodecFilter.java @@ -26,7 +26,6 @@ */ package com.emc.object.s3.jersey; -import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; @@ -92,46 +91,30 @@ public void aroundWriteTo(WriterInterceptorContext context) throws IOException { Map metaBackup = new HashMap<>(userMeta); context.setProperty("com.emc.object.codecFilter.metaBackup", metaBackup); - // NOTE: do NOT add encode metadata to context.getHeaders() here. - // With HttpUrlConnectorProvider, Jersey's CommittingOutputStream defers the - // setOutboundHeaders() callback until the first byte is written. If we add - // x-amz-meta-* headers to context.getHeaders() now, they will be copied to - // the HttpURLConnection and sent to the server, even though they were not - // present when the request was signed — causing a V2/V4 signature mismatch. - // The metadata stored in userMeta is sent via the subsequent CopyObject - // metadata-update request instead (see S3EncryptionClient.putObject). - - // connect the dangling stream and wrap the output. - // Wrap with a stream that strips Content-Length before the first write. - // SizeOverrideWriter.writeTo() adds Content-Length to context.getHeaders() - // (possibly -1 for unpredictable sizes). With allowRestrictedHeaders=true - // (needed for V4 Host header), this leaks through HttpURLConnection's - // setOutboundHeaders alongside Transfer-Encoding: chunked, which is invalid - // and can cause server-side issues. Stripping it here ensures only chunked - // encoding is used for encrypted uploads. OutputStream originalOut = context.getOutputStream(); - final MultivaluedMap headers = context.getHeaders(); - OutputStream safeOut = new FilterOutputStream(originalOut) { - private boolean cleaned = false; - @Override - public void write(int b) throws IOException { - stripContentLength(); - out.write(b); - } - @Override - public void write(byte[] b, int off, int len) throws IOException { - stripContentLength(); - out.write(b, off, len); - } - private void stripContentLength() { - if (!cleaned) { - cleaned = true; - headers.remove("Content-Length"); - } - } - }; + OutputStream encodeStream = encodeChain.getEncodeStream(originalOut, userMeta); + + // add pre-stream encode metadata (IV, DEK, etc.) to the outbound headers. + // This ensures the initial PUT has encryption headers as a safety net: if the + // write succeeds but the subsequent copy-update fails, the object is still + // decryptable. + context.getHeaders().putAll(S3ObjectMetadata.getUmdHeaders(userMeta)); + + // re-sign: the encode metadata headers (x-amz-meta-*) are part of the V2/V4 + // signed canonical headers, so the signature computed by AuthorizationFilter + // is now stale. Use the stashed signer to recompute (same pattern as ChecksumFilter). + S3Signer stashedSigner = (S3Signer) context.getProperty(AuthorizationFilter.PROP_SIGNER); + if (stashedSigner != null) { + String method = (String) context.getProperty(AuthorizationFilter.PROP_SIGN_METHOD); + URI uri = (URI) context.getProperty(AuthorizationFilter.PROP_SIGN_URI); + String resource = (String) context.getProperty(AuthorizationFilter.PROP_SIGN_RESOURCE); + @SuppressWarnings("unchecked") + Map parameters = (Map) context.getProperty(AuthorizationFilter.PROP_SIGN_PARAMETERS); + @SuppressWarnings({"unchecked", "rawtypes"}) + Map> signingHeaders = (Map) context.getHeaders(); + stashedSigner.resign(method, uri, resource, parameters, signingHeaders); + } - OutputStream encodeStream = encodeChain.getEncodeStream(safeOut, userMeta); context.setOutputStream(encodeStream); try { diff --git a/src/main/java/com/emc/object/s3/jersey/GeoPinningFilter.java b/src/main/java/com/emc/object/s3/jersey/GeoPinningFilter.java index 94e386a2..ef694dda 100644 --- a/src/main/java/com/emc/object/s3/jersey/GeoPinningFilter.java +++ b/src/main/java/com/emc/object/s3/jersey/GeoPinningFilter.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.emc.object.AbstractJerseyClient; import com.emc.object.Method; import com.emc.object.ObjectConfig; import com.emc.object.s3.S3Constants; @@ -78,7 +79,7 @@ public void filter(ClientRequestContext requestContext) throws IOException { // if this is a read and failover for retries is requested, round-robin the VDCs for each retry if (objectConfig.isGeoReadRetryFailover() && Method.GET.name().equalsIgnoreCase(requestContext.getMethod())) { - Integer retries = (Integer) requestContext.getProperty(RetryFilter.PROP_RETRY_COUNT); + Integer retries = (Integer) requestContext.getProperty(AbstractJerseyClient.PROP_RETRY_COUNT); if (retries != null) { int newIndex = (geoPinIndex + retries) % healthyVdcs.size(); log.info("geo-pin read retry #{}: failing over from primary VDC {} to VDC {}", diff --git a/src/main/java/com/emc/object/s3/jersey/RetryFilter.java b/src/main/java/com/emc/object/s3/jersey/RetryFilter.java deleted file mode 100644 index fe57c184..00000000 --- a/src/main/java/com/emc/object/s3/jersey/RetryFilter.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2015-2016, EMC Corporation. - * Redistribution and use in source and binary forms, with or without modification, - * are permitted provided that the following conditions are met: - * - * + Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * + Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * + The name of EMC Corporation may not be used to endorse or promote - * products derived from this software without specific prior written - * permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR - * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS - * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ -package com.emc.object.s3.jersey; - -import com.emc.object.s3.S3Config; -import com.emc.object.s3.S3Exception; - -import javax.ws.rs.ProcessingException; -import java.io.IOException; -import java.io.InputStream; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Retry utility for S3 operations. In Jersey 2.x, filters cannot retry requests, - * so retry logic is implemented as a utility that wraps request execution. - */ -public class RetryFilter { - - private static final Logger log = LoggerFactory.getLogger(RetryFilter.class); - - public static final String PROP_RETRY_COUNT = "com.emc.object.retryCount"; - - private S3Config s3Config; - - public RetryFilter(S3Config s3Config) { - this.s3Config = s3Config; - } - - /** - * Checks whether the given exception is retryable and handles retry logic. - * Returns true if the request should be retried, false otherwise. - * Throws the original exception if retries are exhausted or the error is not retryable. - */ - public boolean shouldRetry(RuntimeException orig, int retryCount, InputStream entityStream) { - Throwable t = orig; - - // in this case, the exception was wrapped by Jersey - if (t instanceof ProcessingException) t = t.getCause(); - - if (t instanceof S3Exception) { - S3Exception se = (S3Exception) t; - - // retry all 50x errors except 501 (not implemented) - if (se.getHttpCode() < 500 || se.getHttpCode() == 501) throw orig; - - // retry all IO exceptions - } else if (!(t instanceof IOException)) throw orig; - - // only retry retryLimit times - if (retryCount > s3Config.getRetryLimit()) throw orig; - - // attempt to reset InputStream - if (entityStream != null) { - try { - if (!entityStream.markSupported()) throw new IOException("stream does not support mark/reset"); - entityStream.reset(); - } catch (IOException e) { - log.warn("could not reset entity stream for retry: " + e); - throw orig; - } - } - - // wait for retry delay - if (s3Config.getInitialRetryDelay() > 0) { - int retryDelay = s3Config.getInitialRetryDelay() * (int) Math.pow(2, retryCount - 1); - try { - log.debug("waiting {}ms before retry", retryDelay); - Thread.sleep(retryDelay); - } catch (InterruptedException e) { - log.warn("interrupted while waiting to retry: " + e.getMessage()); - } - } - - log.info("error received in response [{}], retrying ({} of {})...", new Object[] { t, retryCount, s3Config.getRetryLimit() }); - return true; - } - - public int getRetryBufferSize() { - return s3Config.getRetryBufferSize(); - } -} diff --git a/src/main/java/com/emc/object/s3/jersey/S3JerseyClient.java b/src/main/java/com/emc/object/s3/jersey/S3JerseyClient.java index 6f0b0c8b..4f16cfee 100644 --- a/src/main/java/com/emc/object/s3/jersey/S3JerseyClient.java +++ b/src/main/java/com/emc/object/s3/jersey/S3JerseyClient.java @@ -125,7 +125,6 @@ public class S3JerseyClient extends AbstractJerseyClient implements S3Client { protected S3Signer signer; protected SmartConfig smartConfig; - protected RetryFilter retryFilter; public S3JerseyClient(S3Config s3Config) { super(new S3Config(s3Config)); // deep-copy config so that two clients don't share the same host lists (SDK-122) @@ -186,7 +185,6 @@ public S3JerseyClient(S3Config s3Config) { client.register(new NamespaceFilter(this.s3Config)); client.register(new BucketFilter(this.s3Config)); if (this.s3Config.isGeoPinningEnabled()) client.register(new GeoPinningFilter(this.s3Config)); - if (this.s3Config.isRetryEnabled()) retryFilter = new RetryFilter(this.s3Config); client.register(new AuthorizationFilter(this.s3Config)); if (this.s3Config.isChecksumEnabled()) client.register(new ChecksumFilter(this.s3Config)); if (this.s3Config.getFaultInjectionRate() > 0.0f) @@ -223,8 +221,23 @@ public LoadBalancer getLoadBalancer() { } @Override - protected RetryFilter getRetryFilter() { - return retryFilter; + protected boolean isRetryEnabled() { + return s3Config.isRetryEnabled(); + } + + @Override + protected int getRetryLimit() { + return s3Config.getRetryLimit(); + } + + @Override + protected int getInitialRetryDelay() { + return s3Config.getInitialRetryDelay(); + } + + @Override + protected int getRetryBufferSize() { + return s3Config.getRetryBufferSize(); } @Override diff --git a/src/test/java/com/emc/object/s3/ChecksumFilterTest.java b/src/test/java/com/emc/object/s3/ChecksumFilterTest.java index 284cd533..2928ff80 100644 --- a/src/test/java/com/emc/object/s3/ChecksumFilterTest.java +++ b/src/test/java/com/emc/object/s3/ChecksumFilterTest.java @@ -27,46 +27,240 @@ package com.emc.object.s3; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Random; +import java.util.concurrent.Future; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.ClientRequestContext; +import javax.ws.rs.client.ClientResponseContext; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Configuration; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.WriterInterceptorContext; import org.apache.commons.codec.digest.DigestUtils; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientRequest; +import org.glassfish.jersey.client.ClientResponse; +import org.glassfish.jersey.client.spi.AsyncConnectorCallback; +import org.glassfish.jersey.client.spi.Connector; +import org.glassfish.jersey.client.spi.ConnectorProvider; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import com.emc.object.s3.jersey.ChecksumFilter; import com.emc.object.util.ChecksumAlgorithm; import com.emc.object.util.ChecksumError; -import com.emc.object.util.ChecksumValueImpl; import com.emc.object.util.ChecksummedInputStream; +import com.emc.object.util.RestUtil; +import com.emc.object.util.RunningChecksum; + +import static org.mockito.Mockito.*; public class ChecksumFilterTest { + + private static final String PROP_WRITE_CHECKSUM = "com.emc.object.checksumFilter.writeChecksum"; + + // ----------------------------------------------------------------------- + // Unit tests: each filter method tested in isolation via mocks + // ----------------------------------------------------------------------- + + @Test + public void testWriteInterceptorWrapsStreamAndStoresChecksum() throws Exception { + WriterInterceptorContext ctx = mock(WriterInterceptorContext.class); + when(ctx.getProperty(RestUtil.PROPERTY_VERIFY_WRITE_CHECKSUM)).thenReturn(Boolean.TRUE); + when(ctx.getProperty(RestUtil.PROPERTY_GENERATE_CONTENT_MD5)).thenReturn(null); + when(ctx.getOutputStream()).thenReturn(new ByteArrayOutputStream()); + + new ChecksumFilter(new S3Config()).aroundWriteTo(ctx); + + // the output stream must be wrapped with a checksumming stream + ArgumentCaptor streamCaptor = ArgumentCaptor.forClass(OutputStream.class); + verify(ctx).setOutputStream(streamCaptor.capture()); + Assertions.assertNotNull(streamCaptor.getValue()); + + // the running checksum must be stored so the response filter can compare it + verify(ctx).setProperty(eq(PROP_WRITE_CHECKSUM), any(RunningChecksum.class)); + + // the chain must be continued + verify(ctx).proceed(); + } + + @Test + public void testResponseFilterPassesOnCorrectWriteChecksum() throws Exception { + byte[] data = new byte[1024]; + new Random().nextBytes(data); + String correctMd5 = DigestUtils.md5Hex(data); + + RunningChecksum storedChecksum = new RunningChecksum(ChecksumAlgorithm.MD5); + storedChecksum.update(data, 0, data.length); + + ClientRequestContext reqCtx = mock(ClientRequestContext.class); + ClientResponseContext respCtx = mock(ClientResponseContext.class); + MultivaluedHashMap headers = new MultivaluedHashMap<>(); + headers.putSingle(RestUtil.EMC_CONTENT_MD5, correctMd5); + + when(reqCtx.getProperty(RestUtil.PROPERTY_VERIFY_WRITE_CHECKSUM)).thenReturn(Boolean.TRUE); + when(reqCtx.getProperty(RestUtil.PROPERTY_VERIFY_READ_CHECKSUM)).thenReturn(null); + when(reqCtx.getProperty(PROP_WRITE_CHECKSUM)).thenReturn(storedChecksum); + when(respCtx.getHeaders()).thenReturn(headers); + + // should not throw when checksum matches + new ChecksumFilter(new S3Config()).filter(reqCtx, respCtx); + } + @Test - public void testChecksumVerification() throws Exception { + public void testResponseFilterThrowsOnWriteChecksumMismatch() throws Exception { byte[] data = new byte[1024]; new Random().nextBytes(data); + RunningChecksum storedChecksum = new RunningChecksum(ChecksumAlgorithm.MD5); + storedChecksum.update(data, 0, data.length); + + ClientRequestContext reqCtx = mock(ClientRequestContext.class); + ClientResponseContext respCtx = mock(ClientResponseContext.class); + MultivaluedHashMap headers = new MultivaluedHashMap<>(); + headers.putSingle(RestUtil.EMC_CONTENT_MD5, "abcdef0123456789abcdef0123456789"); + + when(reqCtx.getProperty(RestUtil.PROPERTY_VERIFY_WRITE_CHECKSUM)).thenReturn(Boolean.TRUE); + when(reqCtx.getProperty(RestUtil.PROPERTY_VERIFY_READ_CHECKSUM)).thenReturn(null); + when(reqCtx.getProperty(PROP_WRITE_CHECKSUM)).thenReturn(storedChecksum); + when(respCtx.getHeaders()).thenReturn(headers); + + Assertions.assertThrows(ChecksumError.class, + () -> new ChecksumFilter(new S3Config()).filter(reqCtx, respCtx)); + } + + @Test + public void testResponseFilterWrapsEntityStreamForReadVerification() throws Exception { + byte[] data = new byte[1024]; + new Random().nextBytes(data); String correctMd5 = DigestUtils.md5Hex(data); - // positive test - correct checksum should not throw - ChecksummedInputStream goodStream = new ChecksummedInputStream( - new ByteArrayInputStream(data), - new ChecksumValueImpl(ChecksumAlgorithm.MD5, data.length, correctMd5)); - byte[] buffer = new byte[1024]; - int total = 0, n; - while ((n = goodStream.read(buffer)) >= 0) total += n; - goodStream.close(); - Assertions.assertEquals(data.length, total); - - // negative test - bad checksum should throw ChecksumError + ClientRequestContext reqCtx = mock(ClientRequestContext.class); + ClientResponseContext respCtx = mock(ClientResponseContext.class); + MultivaluedHashMap headers = new MultivaluedHashMap<>(); + headers.putSingle(RestUtil.HEADER_ETAG, correctMd5); + + when(reqCtx.getProperty(RestUtil.PROPERTY_VERIFY_WRITE_CHECKSUM)).thenReturn(null); + when(reqCtx.getProperty(RestUtil.PROPERTY_VERIFY_READ_CHECKSUM)).thenReturn(Boolean.TRUE); + when(respCtx.getHeaders()).thenReturn(headers); + when(respCtx.getEntityStream()).thenReturn(new ByteArrayInputStream(data)); + + new ChecksumFilter(new S3Config()).filter(reqCtx, respCtx); + + // the response stream must be wrapped with a ChecksummedInputStream + ArgumentCaptor streamCaptor = ArgumentCaptor.forClass(InputStream.class); + verify(respCtx).setEntityStream(streamCaptor.capture()); + Assertions.assertInstanceOf(ChecksummedInputStream.class, streamCaptor.getValue()); + } + + // ----------------------------------------------------------------------- + // End-to-end tests: real Jersey client pipeline proves that the checksum + // stored by aroundWriteTo() via WriterInterceptorContext.setProperty() + // propagates to filter() via ClientRequestContext.getProperty(). + // ----------------------------------------------------------------------- + + @Test + public void testEndToEndWriteChecksumThrowsOnMismatch() throws Exception { + byte[] data = new byte[1024]; + new Random().nextBytes(data); + String wrongMd5 = "abcdef0123456789abcdef0123456789"; + + Client client = ClientBuilder.newClient( + new ClientConfig().connectorProvider(new MockConnector(wrongMd5))); + client.register(new ChecksumFilter(new S3Config())); try { - ChecksummedInputStream badStream = new ChecksummedInputStream( - new ByteArrayInputStream(data), - new ChecksumValueImpl(ChecksumAlgorithm.MD5, data.length, "abcdef0123456789abcdef0123456789")); - buffer = new byte[1024]; - while (badStream.read(buffer) >= 0) { /* read to EOF to trigger verification */ } - badStream.close(); - Assertions.fail("bad MD5 should throw exception"); - } catch (ChecksumError e) { - // expected + client.target("http://localhost/test") + .request() + .property(RestUtil.PROPERTY_VERIFY_WRITE_CHECKSUM, Boolean.TRUE) + .put(Entity.entity(data, "application/octet-stream")); + Assertions.fail("expected ChecksumError wrapped in ProcessingException"); + } catch (ProcessingException e) { + Assertions.assertInstanceOf(ChecksumError.class, e.getCause(), + "root cause must be a ChecksumError, was: " + e.getCause()); + } finally { + client.close(); } } + + @Test + public void testEndToEndWriteChecksumPassesOnMatch() throws Exception { + byte[] data = new byte[1024]; + new Random().nextBytes(data); + String correctMd5 = DigestUtils.md5Hex(data); + + Client client = ClientBuilder.newClient( + new ClientConfig().connectorProvider(new MockConnector(correctMd5))); + client.register(new ChecksumFilter(new S3Config())); + try { + Response response = client.target("http://localhost/test") + .request() + .property(RestUtil.PROPERTY_VERIFY_WRITE_CHECKSUM, Boolean.TRUE) + .put(Entity.entity(data, "application/octet-stream")); + Assertions.assertEquals(200, response.getStatus()); + } finally { + client.close(); + } + } + + // ----------------------------------------------------------------------- + // Mock connector: acts as an in-process "server" so tests run without + // a real HTTP endpoint. Calling request.writeEntity() drains the entity + // through the full WriterInterceptor chain (including ChecksumFilter), + // then a fake response is returned with a caller-supplied MD5 header. + // ----------------------------------------------------------------------- + + private static class MockConnector implements Connector, ConnectorProvider { + private final String responseMd5; + + MockConnector(String responseMd5) { + this.responseMd5 = responseMd5; + } + + @Override + public Connector getConnector(Client client, Configuration runtimeConfig) { + return this; + } + + @Override + public ClientResponse apply(ClientRequest request) throws ProcessingException { + // Provide a sink stream and run the WriterInterceptor chain; this is + // where ChecksumFilter.aroundWriteTo() stores the RunningChecksum into + // the request property bag via WriterInterceptorContext.setProperty(). + request.setStreamProvider(contentLength -> new ByteArrayOutputStream()); + try { + request.writeEntity(); + } catch (IOException e) { + throw new ProcessingException(e); + } + + // Return a fake 200 response carrying the configured MD5 header. + // ChecksumFilter.filter() will pick it up from ClientRequestContext + // (same property bag) and compare against the stored checksum. + ClientResponse response = new ClientResponse(Response.Status.OK, request); + response.headers(RestUtil.EMC_CONTENT_MD5, responseMd5); + response.setEntityStream(new ByteArrayInputStream(new byte[0])); + return response; + } + + @Override + public Future apply(ClientRequest request, AsyncConnectorCallback callback) { + throw new UnsupportedOperationException("async not supported by MockConnector"); + } + + @Override + public String getName() { return "MockConnector"; } + + @Override + public void close() { } + } } diff --git a/src/test/java/com/emc/object/s3/GeoPinningTest.java b/src/test/java/com/emc/object/s3/GeoPinningTest.java index e6075330..f883d44f 100644 --- a/src/test/java/com/emc/object/s3/GeoPinningTest.java +++ b/src/test/java/com/emc/object/s3/GeoPinningTest.java @@ -26,17 +26,20 @@ */ package com.emc.object.s3; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import com.emc.object.AbstractJerseyClient; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; import com.emc.object.ObjectConfig; +import com.emc.object.s3.jersey.GeoPinningFilter; import com.emc.object.s3.jersey.GeoPinningRule; import com.emc.object.s3.jersey.S3JerseyClient; import com.emc.object.util.GeoPinningUtil; @@ -148,42 +151,45 @@ public void testVdcDistribution() { @Test public void testReadRetryFailoverInFilter() throws Exception { - // create a separate client with geo-read-retry-failover enabled S3Config s3ConfigF = createS3Config(); s3ConfigF.setGeoReadRetryFailover(true); - S3JerseyClient failoverClient = new S3JerseyClient(s3ConfigF); - Thread.sleep(500); // wait for polling daemon - - try { - String key = "my/object/key"; - int geoIndex = 0xbb8619 % vdcs.size(); - - // write the test object - failoverClient.putObject(getTestBucket(), key, "Hello GeoPinning!", "text/plain"); - - LoadBalancer loadBalancer = failoverClient.getLoadBalancer(); - loadBalancer.resetStats(); - - // read the object 10 times — should all route to the geo-pinned VDC - for (int i = 0; i < 10; i++) { - failoverClient.readObject(getTestBucket(), key, String.class); - } - - // verify no errors and total count - Assert.assertEquals(0, loadBalancer.getTotalErrors()); - Assert.assertEquals(10, loadBalancer.getTotalConnections()); - - // verify reads are routed to the correct VDC - for (HostStats stats : loadBalancer.getHostStats()) { - if (vdcs.get(geoIndex).equals(((VdcHost) stats).getVdc())) { - Assert.assertTrue(stats.getTotalConnections() > 0); - } else { - Assert.assertEquals(0, stats.getTotalConnections()); - } - } - } finally { - failoverClient.destroy(); - } + GeoPinningFilter filter = new GeoPinningFilter(s3ConfigF); + + String bucket = getTestBucket(); + String key = "my/object/key"; + int geoIndex = 0xbb8619 % vdcs.size(); + URI uri = URI.create("http://localhost/test"); + + // no retry: routed to the primary geo-pinned VDC + TestClientRequestContexts.StubClientRequestContext ctx0 = + TestClientRequestContexts.request("GET", uri); + ctx0.setProperty(S3Constants.PROPERTY_BUCKET_NAME, bucket); + ctx0.setProperty(S3Constants.PROPERTY_OBJECT_KEY, key); + filter.filter(ctx0); + Vdc primary = (Vdc) ctx0.getProperty(GeoPinningRule.PROP_GEO_PINNED_VDC); + Assert.assertEquals(vdcs.get(geoIndex), primary); + + // retry 1: must failover to a different VDC + TestClientRequestContexts.StubClientRequestContext ctx1 = + TestClientRequestContexts.request("GET", uri); + ctx1.setProperty(S3Constants.PROPERTY_BUCKET_NAME, bucket); + ctx1.setProperty(S3Constants.PROPERTY_OBJECT_KEY, key); + ctx1.setProperty(AbstractJerseyClient.PROP_RETRY_COUNT, 1); + filter.filter(ctx1); + Vdc failover1 = (Vdc) ctx1.getProperty(GeoPinningRule.PROP_GEO_PINNED_VDC); + Assert.assertNotEquals("retry 1 must use a different VDC", primary, failover1); + Assert.assertEquals(vdcs.get((geoIndex + 1) % vdcs.size()), failover1); + + // retry 2: must failover to yet another VDC + TestClientRequestContexts.StubClientRequestContext ctx2 = + TestClientRequestContexts.request("GET", uri); + ctx2.setProperty(S3Constants.PROPERTY_BUCKET_NAME, bucket); + ctx2.setProperty(S3Constants.PROPERTY_OBJECT_KEY, key); + ctx2.setProperty(AbstractJerseyClient.PROP_RETRY_COUNT, 2); + filter.filter(ctx2); + Vdc failover2 = (Vdc) ctx2.getProperty(GeoPinningRule.PROP_GEO_PINNED_VDC); + Assert.assertNotEquals("retry 2 must differ from retry 1", failover1, failover2); + Assert.assertEquals(vdcs.get((geoIndex + 2) % vdcs.size()), failover2); } protected void testKeyDistribution(String key, int vdcIndex) { diff --git a/src/test/java/com/emc/object/s3/S3EncryptionClientBasicTest.java b/src/test/java/com/emc/object/s3/S3EncryptionClientBasicTest.java index ee92a517..3fa878fd 100644 --- a/src/test/java/com/emc/object/s3/S3EncryptionClientBasicTest.java +++ b/src/test/java/com/emc/object/s3/S3EncryptionClientBasicTest.java @@ -432,6 +432,11 @@ public void testDeleteBucketWithObjects() { public void testDeleteBucketWithBackgroundTasks() { } + @Ignore + @Override + public void testDeleteBucketWithMPUWithBackgroundTasks() { + } + @Ignore @Override public void testDeleteBucketInRetentionWithBackgroundTasks() { diff --git a/src/test/java/com/emc/object/s3/S3JerseyClientTest.java b/src/test/java/com/emc/object/s3/S3JerseyClientTest.java index 9c2bcd9b..ae4e4772 100644 --- a/src/test/java/com/emc/object/s3/S3JerseyClientTest.java +++ b/src/test/java/com/emc/object/s3/S3JerseyClientTest.java @@ -101,10 +101,6 @@ public void testMultipleVdcs() throws Exception { Assume.assumeNoException(e); } - // requires a multi-node ECS cluster to meaningfully exercise load balancing - Assume.assumeTrue("testMultipleVdcs requires a multi-node ECS cluster", - config.getVdcs().get(0).getHosts().size() > 1); - // just going to use the same VDC twice for lack of a geo env. List hosts = config.getVdcs().get(0).getHosts(); Vdc vdc1 = new Vdc("vdc1", new ArrayList(hosts)), vdc2 = new Vdc("vdc2", new ArrayList(hosts)); @@ -116,7 +112,7 @@ public void testMultipleVdcs() throws Exception { S3JerseyClient tempClient = new S3JerseyClient(config); - Thread.sleep(1000); // wait for poll to complete + Thread.sleep(3000); // wait for poll to complete // the client will clone the config, so we have to get new references vdc1 = tempClient.getS3Config().getVdcs().get(0); @@ -579,9 +575,6 @@ public void testDeleteBucketWithBackgroundTasks() throws Exception { @Test public void testDeleteBucketWithMPUWithBackgroundTasks() throws Exception { Assume.assumeTrue("ECS version must be at least 3.8", ecsVersion != null && ecsVersion.compareTo("3.8") >= 0); - // MPU is not supported by the encryption client - Assume.assumeFalse("MPU is not supported by the encryption client", - client instanceof com.emc.object.s3.jersey.S3EncryptionClient); String bucketName = getTestBucket() + "-mpu"; client.createBucket(bucketName); diff --git a/src/test/java/com/emc/object/s3/S3V2AuthUtilTest.java b/src/test/java/com/emc/object/s3/S3V2AuthUtilTest.java index f1411869..16cea255 100644 --- a/src/test/java/com/emc/object/s3/S3V2AuthUtilTest.java +++ b/src/test/java/com/emc/object/s3/S3V2AuthUtilTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import javax.ws.rs.client.ClientRequestContext; import javax.ws.rs.core.MultivaluedMap; import java.net.URI; import java.util.Date; @@ -121,23 +122,26 @@ public void testSign() throws Exception { S3Config s3Config = new S3Config(new URI("http://here.com")).withIdentity(ACCESS_KEY).withSecretKey(SECRET_KEY); S3SignerV2 signer = new S3SignerV2(s3Config); - // Test signing via string-to-sign and signature verification - // (sign() now requires ClientRequestContext, so we test the underlying methods directly) - String stringToSign1 = signer.getStringToSign(METHOD_1, RESOURCE_1, PARAMETERS_1, HEADERS_1); - String signature1 = signer.getSignature(stringToSign1, null); - Assertions.assertEquals(SIGNATURE_1, signature1); + ClientRequestContext request = TestClientRequestContexts.request(METHOD_1, new URI("http://s3.company.com")); - String stringToSign2 = signer.getStringToSign(METHOD_2, RESOURCE_2, PARAMETERS_2, HEADERS_2); - String signature2 = signer.getSignature(stringToSign2, null); - Assertions.assertEquals(SIGNATURE_2, signature2); + MultivaluedHashMap headers = new MultivaluedHashMap<>(HEADERS_1); + signer.sign(request, RESOURCE_1, PARAMETERS_1, headers); + Assertions.assertEquals("AWS " + ACCESS_KEY + ":" + SIGNATURE_1, headers.getFirst("Authorization")); - String stringToSign3 = signer.getStringToSign(METHOD_3, RESOURCE_3, PARAMETERS_3, HEADERS_3); - String signature3 = signer.getSignature(stringToSign3, null); - Assertions.assertEquals(SIGNATURE_3, signature3); + headers = new MultivaluedHashMap<>(HEADERS_2); + request = TestClientRequestContexts.request(METHOD_2, new URI("http://s3.company.com")); + signer.sign(request, RESOURCE_2, PARAMETERS_2, headers); + Assertions.assertEquals("AWS " + ACCESS_KEY + ":" + SIGNATURE_2, headers.getFirst("Authorization")); - String stringToSign4 = signer.getStringToSign(METHOD_4, RESOURCE_4, PARAMETERS_4, HEADERS_4); - String signature4 = signer.getSignature(stringToSign4, null); - Assertions.assertEquals(SIGNATURE_4, signature4); + headers = new MultivaluedHashMap<>(HEADERS_3); + request = TestClientRequestContexts.request(METHOD_3, new URI("http://s3.company.com")); + signer.sign(request, RESOURCE_3, PARAMETERS_3, headers); + Assertions.assertEquals("AWS " + ACCESS_KEY + ":" + SIGNATURE_3, headers.getFirst("Authorization")); + + headers = new MultivaluedHashMap<>(HEADERS_4); + request = TestClientRequestContexts.request(METHOD_4, new URI("http://s3.company.com")); + signer.sign(request, RESOURCE_4, PARAMETERS_4, headers); + Assertions.assertEquals("AWS " + ACCESS_KEY + ":" + SIGNATURE_4, headers.getFirst("Authorization")); } @Test diff --git a/src/test/java/com/emc/object/util/ChecksummedInputStreamTest.java b/src/test/java/com/emc/object/util/ChecksummedInputStreamTest.java new file mode 100644 index 00000000..c65db414 --- /dev/null +++ b/src/test/java/com/emc/object/util/ChecksummedInputStreamTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2015, EMC Corporation. + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * + Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + The name of EMC Corporation may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +package com.emc.object.util; + +import java.io.ByteArrayInputStream; +import java.util.Random; + +import org.apache.commons.codec.digest.DigestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ChecksummedInputStreamTest { + @Test + public void testChecksumVerification() throws Exception { + byte[] data = new byte[1024]; + new Random().nextBytes(data); + + String correctMd5 = DigestUtils.md5Hex(data); + + // positive test - correct checksum should not throw + ChecksummedInputStream goodStream = new ChecksummedInputStream( + new ByteArrayInputStream(data), + new ChecksumValueImpl(ChecksumAlgorithm.MD5, data.length, correctMd5)); + byte[] buffer = new byte[1024]; + int total = 0, n; + while ((n = goodStream.read(buffer)) >= 0) total += n; + goodStream.close(); + Assertions.assertEquals(data.length, total); + + // negative test - bad checksum should throw ChecksumError + try { + ChecksummedInputStream badStream = new ChecksummedInputStream( + new ByteArrayInputStream(data), + new ChecksumValueImpl(ChecksumAlgorithm.MD5, data.length, "abcdef0123456789abcdef0123456789")); + buffer = new byte[1024]; + while (badStream.read(buffer) >= 0) { /* read to EOF to trigger verification */ } + badStream.close(); + Assertions.fail("bad MD5 should throw exception"); + } catch (ChecksumError e) { + // expected + } + } +}