From fbc54d970bd4e6bdaaa39441a10bf5f9b70fc271 Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Thu, 30 Jul 2020 10:56:27 +0200 Subject: [PATCH 1/3] [BEAM-601] Run KinesisIOIT withtestcontainers with localstack --- sdks/java/io/kinesis/build.gradle | 1 + .../sdk/io/kinesis/BasicKinesisProvider.java | 20 ++- .../apache/beam/sdk/io/kinesis/KinesisIO.java | 44 +++++ .../beam/sdk/io/kinesis/KinesisIOIT.java | 150 +++++++++++++++--- .../sdk/io/kinesis/KinesisTestOptions.java | 13 ++ 5 files changed, 207 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/kinesis/build.gradle b/sdks/java/io/kinesis/build.gradle index ce8c5a6fffb2..1f62346aab44 100644 --- a/sdks/java/io/kinesis/build.gradle +++ b/sdks/java/io/kinesis/build.gradle @@ -50,6 +50,7 @@ dependencies { testCompile library.java.powermock testCompile library.java.powermock_mockito testCompile "org.assertj:assertj-core:3.11.1" + testCompile 'org.testcontainers:localstack:1.11.2' testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java index 8f5d379d8f21..d80278b404ae 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/BasicKinesisProvider.java @@ -31,6 +31,7 @@ import com.amazonaws.services.kinesis.producer.IKinesisProducer; import com.amazonaws.services.kinesis.producer.KinesisProducer; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import java.net.URI; import org.checkerframework.checker.nullness.qual.Nullable; /** Basic implementation of {@link AWSClientsProvider} used by default in {@link KinesisIO}. */ @@ -39,9 +40,14 @@ class BasicKinesisProvider implements AWSClientsProvider { private final String secretKey; private final Regions region; private final @Nullable String serviceEndpoint; + private final boolean verifyCertificate; BasicKinesisProvider( - String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) { + String accessKey, + String secretKey, + Regions region, + @Nullable String serviceEndpoint, + boolean verifyCertificate) { checkArgument(accessKey != null, "accessKey can not be null"); checkArgument(secretKey != null, "secretKey can not be null"); checkArgument(region != null, "region can not be null"); @@ -49,6 +55,12 @@ class BasicKinesisProvider implements AWSClientsProvider { this.secretKey = secretKey; this.region = region; this.serviceEndpoint = serviceEndpoint; + this.verifyCertificate = verifyCertificate; + } + + BasicKinesisProvider( + String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) { + this(accessKey, secretKey, region, serviceEndpoint, true); } private AWSCredentialsProvider getCredentialsProvider() { @@ -85,6 +97,12 @@ public AmazonCloudWatch getCloudWatchClient() { public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config) { config.setRegion(region.getName()); config.setCredentialsProvider(getCredentialsProvider()); + if (serviceEndpoint != null) { + URI uri = URI.create(serviceEndpoint); + config.setKinesisEndpoint(uri.getHost()); + config.setKinesisPort(uri.getPort()); + } + config.setVerifyCertificate(verifyCertificate); return new KinesisProducer(config); } } diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java index b50da1742bed..62f3e141bae5 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java @@ -413,6 +413,28 @@ public Read withAWSClientsProvider( new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint)); } + /** + * Specify credential details and region to be used to read from Kinesis. If you need more + * sophisticated credential protocol, then you should look at {@link + * Read#withAWSClientsProvider(AWSClientsProvider)}. + * + *

The {@code serviceEndpoint} sets an alternative service host. This is useful to execute + * the tests with Kinesis service emulator. + * + *

The {@code veriftCertificate} disables or enables certificate verification. Never set it + * to false in production. + */ + public Read withAWSClientsProvider( + String awsAccessKey, + String awsSecretKey, + Regions region, + String serviceEndpoint, + boolean verifyCertificate) { + return withAWSClientsProvider( + new BasicKinesisProvider( + awsAccessKey, awsSecretKey, region, serviceEndpoint, verifyCertificate)); + } + /** Specifies to read at most a given number of records. */ public Read withMaxNumRecords(long maxNumRecords) { checkArgument( @@ -670,6 +692,28 @@ public Write withAWSClientsProvider( new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint)); } + /** + * Specify credential details and region to be used to write to Kinesis. If you need more + * sophisticated credential protocol, then you should look at {@link + * Write#withAWSClientsProvider(AWSClientsProvider)}. + * + *

The {@code serviceEndpoint} sets an alternative service host. This is useful to execute + * the tests with Kinesis service emulator. + * + *

The {@code veriftCertificate} disables or enables certificate verification. Never set it + * to false in production. + */ + public Write withAWSClientsProvider( + String awsAccessKey, + String awsSecretKey, + Regions region, + String serviceEndpoint, + boolean verifyCertificate) { + return withAWSClientsProvider( + new BasicKinesisProvider( + awsAccessKey, awsSecretKey, region, serviceEndpoint, verifyCertificate)); + } + /** * Specify the number of retries that will be used to flush the outstanding records in case if * they were not flushed from the first time. Default number of retries is {@code diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java index 5f3a003367f7..e53a12d8f065 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java @@ -17,11 +17,19 @@ */ package org.apache.beam.sdk.io.kinesis; +import com.amazonaws.SDKGlobalConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.Random; +import java.util.UUID; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.common.TestRow; @@ -34,34 +42,52 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; -import org.joda.time.Instant; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.testcontainers.containers.localstack.LocalStackContainer; /** * Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link - * KinesisTestOptions} in order to run this. + * KinesisTestOptions} in order to run this if you want to test it with production setup. By default + * when no options are provided an instance of localstack is used. */ @RunWith(JUnit4.class) public class KinesisIOIT implements Serializable { - private static int numberOfShards; - private static int numberOfRows; + private static final String LOCALSTACK_VERSION = "0.11.3"; @Rule public TestPipeline pipelineWrite = TestPipeline.create(); @Rule public TestPipeline pipelineRead = TestPipeline.create(); + private static LocalStackContainer localstackContainer; + private static String streamName; + private static AmazonKinesis kinesisClient; + private static KinesisTestOptions options; - private static final Instant now = Instant.now(); @BeforeClass - public static void setup() { + public static void setup() throws Exception { PipelineOptionsFactory.register(KinesisTestOptions.class); options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class); - numberOfShards = options.getNumberOfShards(); - numberOfRows = options.getNumberOfRecords(); + if (doUseLocalstack()) { + setupLocalstack(); + } + kinesisClient = createKinesisClient(); + streamName = "beam_test_kinesis" + UUID.randomUUID(); + createStream(); + } + + @AfterClass + public static void teardown() { + if (doUseLocalstack()) { + System.clearProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY); + System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY); + localstackContainer.stop(); + } + kinesisClient.deleteStream(streamName); } /** Test which write and then read data for a Kinesis stream. */ @@ -74,18 +100,20 @@ public void testWriteThenRead() { /** Write test dataset into Kinesis stream. */ private void runWrite() { pipelineWrite - .apply("Generate Sequence", GenerateSequence.from(0).to((long) numberOfRows)) + .apply("Generate Sequence", GenerateSequence.from(0).to(options.getNumberOfRecords())) .apply("Prepare TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())) .apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes())) .apply( "Write to Kinesis", KinesisIO.write() - .withStreamName(options.getAwsKinesisStream()) + .withStreamName(streamName) .withPartitioner(new RandomPartitioner()) .withAWSClientsProvider( options.getAwsAccessKey(), options.getAwsSecretKey(), - Regions.fromName(options.getAwsKinesisRegion()))); + Regions.fromName(options.getAwsKinesisRegion()), + options.getAwsServiceEndpoint(), + options.getAwsVerifyCertificate())); pipelineWrite.run().waitUntilFinish(); } @@ -95,20 +123,21 @@ private void runRead() { PCollection output = pipelineRead.apply( KinesisIO.read() - .withStreamName(options.getAwsKinesisStream()) + .withStreamName(streamName) .withAWSClientsProvider( options.getAwsAccessKey(), options.getAwsSecretKey(), - Regions.fromName(options.getAwsKinesisRegion())) - .withMaxNumRecords(numberOfRows) + Regions.fromName(options.getAwsKinesisRegion()), + options.getAwsServiceEndpoint(), + options.getAwsVerifyCertificate()) + .withMaxNumRecords(options.getNumberOfRecords()) // to prevent endless running in case of error - .withMaxReadTime(Duration.standardMinutes(10)) - .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP) - .withInitialTimestampInStream(now) + .withMaxReadTime(Duration.standardMinutes(10L)) + .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) .withRequestRecordsLimit(1000)); PAssert.thatSingleton(output.apply("Count All", Count.globally())) - .isEqualTo((long) numberOfRows); + .isEqualTo((long) options.getNumberOfRecords()); PCollection consolidatedHashcode = output @@ -116,11 +145,92 @@ private void runRead() { .apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults()); PAssert.that(consolidatedHashcode) - .containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows)); + .containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords())); pipelineRead.run().waitUntilFinish(); } + /** Necessary setup for localstack environment. */ + private static void setupLocalstack() { + System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); + System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true"); + + localstackContainer = + new LocalStackContainer(LOCALSTACK_VERSION) + .withServices(LocalStackContainer.Service.KINESIS) + .withEnv("USE_SSL", "true") + .withStartupAttempts(3); + localstackContainer.start(); + + options.setAwsServiceEndpoint( + localstackContainer + .getEndpointConfiguration(LocalStackContainer.Service.KINESIS) + .getServiceEndpoint() + .replace("http", "https")); + options.setAwsKinesisRegion( + localstackContainer + .getEndpointConfiguration(LocalStackContainer.Service.KINESIS) + .getSigningRegion()); + options.setAwsAccessKey( + localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId()); + options.setAwsSecretKey( + localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey()); + options.setNumberOfRecords(1000); + options.setNumberOfShards(1); + options.setAwsKinesisStream("beam_kinesis_test"); + options.setAwsVerifyCertificate(false); + } + + private static AmazonKinesis createKinesisClient() { + AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); + + AWSCredentialsProvider credentialsProvider = + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey())); + clientBuilder.setCredentials(credentialsProvider); + + if (options.getAwsServiceEndpoint() != null) { + AwsClientBuilder.EndpointConfiguration endpointConfiguration = + new AwsClientBuilder.EndpointConfiguration( + options.getAwsServiceEndpoint(), options.getAwsKinesisRegion()); + clientBuilder.setEndpointConfiguration(endpointConfiguration); + } else { + clientBuilder.setRegion(options.getAwsKinesisRegion()); + } + + return clientBuilder.build(); + } + + private static void createStream() throws Exception { + kinesisClient.createStream(streamName, 1); + int repeats = 10; + for (int i = 0; i <= repeats; ++i) { + String streamStatus = + kinesisClient.describeStream(streamName).getStreamDescription().getStreamStatus(); + if ("ACTIVE".equals(streamStatus)) { + break; + } + if (i == repeats) { + throw new RuntimeException("Unable to initialize stream"); + } + Thread.sleep(1000L); + } + } + + /** Check whether pipeline options were provided. If not, use localstack container. */ + private static boolean doUseLocalstack() { + KinesisTestOptions defaults = PipelineOptionsFactory.fromArgs().as(KinesisTestOptions.class); + return defaults.getAwsAccessKey().equals(options.getAwsAccessKey()) + && defaults.getAwsSecretKey().equals(options.getAwsSecretKey()) + && defaults.getAwsKinesisStream().equals(options.getAwsKinesisStream()) + && defaults.getAwsKinesisRegion().equals(options.getAwsKinesisRegion()) + && defaults.getNumberOfShards().equals(options.getNumberOfShards()) + && defaults.getNumberOfRecords().equals(options.getNumberOfRecords()) + && (options.getAwsServiceEndpoint() == null + || options.getAwsServiceEndpoint().equals(defaults.getAwsServiceEndpoint())) + && defaults.getAwsVerifyCertificate().equals(options.getAwsVerifyCertificate()); + } + /** Produces test rows. */ private static class ConvertToBytes extends DoFn { @ProcessElement @@ -141,7 +251,7 @@ private static final class RandomPartitioner implements KinesisPartitioner { @Override public String getPartitionKey(byte[] value) { Random rand = new Random(); - int n = rand.nextInt(numberOfShards) + 1; + int n = rand.nextInt(options.getNumberOfShards()) + 1; return String.valueOf(n); } diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java index 185f9532a901..ff6ef924393f 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.checkerframework.checker.nullness.qual.Nullable; /** Options for Kinesis integration tests. */ public interface KinesisTestOptions extends TestPipelineOptions { @@ -48,6 +49,18 @@ public interface KinesisTestOptions extends TestPipelineOptions { void setAwsAccessKey(String value); + @Description("Aws service endpoint") + @Nullable + String getAwsServiceEndpoint(); + + void setAwsServiceEndpoint(String awsServiceEndpoint); + + @Description("Flag for certificate verification") + @Default.Boolean(true) + Boolean getAwsVerifyCertificate(); + + void setAwsVerifyCertificate(Boolean awsVerifyCertificate); + @Description("Number of shards of stream") @Default.Integer(2) Integer getNumberOfShards(); From eb11a8ad83ecc932ad1e723b93b2f879710027e0 Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 5 Aug 2020 10:59:08 +0200 Subject: [PATCH 2/3] [BEAM-601] Add kinesis integration test to Java postcommit --- build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/build.gradle b/build.gradle index 93ca237de5f3..8e500a9d83be 100644 --- a/build.gradle +++ b/build.gradle @@ -178,6 +178,7 @@ task javaPostCommit() { dependsOn ":sdks:java:extensions:google-cloud-platform-core:postCommit" dependsOn ":sdks:java:extensions:zetasketch:postCommit" dependsOn ":sdks:java:io:google-cloud-platform:postCommit" + dependsOn ":sdks:java:io:kinesis:integrationTest" } task sqlPostCommit() { From 87484f4281ca8d3d8893a019182ef762d18151f8 Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Thu, 6 Aug 2020 21:02:42 +0200 Subject: [PATCH 3/3] Fixes after Alexey's code review --- .../beam/gradle/BeamModulePlugin.groovy | 2 + sdks/java/io/amazon-web-services/build.gradle | 2 +- .../java/io/amazon-web-services2/build.gradle | 2 +- sdks/java/io/kinesis/build.gradle | 2 +- .../apache/beam/sdk/io/kinesis/KinesisIO.java | 4 +- .../beam/sdk/io/kinesis/KinesisIOIT.java | 47 +++++++------------ .../sdk/io/kinesis/KinesisTestOptions.java | 6 +++ 7 files changed, 31 insertions(+), 34 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 3622764fa084..0e75c864b5b5 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -416,6 +416,7 @@ class BeamModulePlugin implements Plugin { def quickcheck_version = "0.8" def spark_version = "2.4.6" def spotbugs_version = "4.0.6" + def testcontainers_localstack_version = "1.14.3" // A map of maps containing common libraries used per language. To use: // dependencies { @@ -564,6 +565,7 @@ class BeamModulePlugin implements Plugin { spark_sql : "org.apache.spark:spark-sql_2.11:$spark_version", spark_streaming : "org.apache.spark:spark-streaming_2.11:$spark_version", stax2_api : "org.codehaus.woodstox:stax2-api:3.1.4", + testcontainers_localstack : "org.testcontainers:localstack:$testcontainers_localstack_version", vendored_bytebuddy_1_10_8 : "org.apache.beam:beam-vendor-bytebuddy-1_10_8:0.1", vendored_grpc_1_26_0 : "org.apache.beam:beam-vendor-grpc-1_26_0:0.3", vendored_guava_26_0_jre : "org.apache.beam:beam-vendor-guava-26_0-jre:0.1", diff --git a/sdks/java/io/amazon-web-services/build.gradle b/sdks/java/io/amazon-web-services/build.gradle index 437a5d4541f9..0257f537102d 100644 --- a/sdks/java/io/amazon-web-services/build.gradle +++ b/sdks/java/io/amazon-web-services/build.gradle @@ -54,9 +54,9 @@ dependencies { testCompile library.java.hamcrest_library testCompile library.java.mockito_core testCompile library.java.junit + testCompile library.java.testcontainers_localstack testCompile "org.assertj:assertj-core:3.11.1" testCompile 'org.elasticmq:elasticmq-rest-sqs_2.12:0.14.1' - testCompile 'org.testcontainers:localstack:1.11.2' testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/amazon-web-services2/build.gradle b/sdks/java/io/amazon-web-services2/build.gradle index 784624a90c5b..321aa2b01367 100644 --- a/sdks/java/io/amazon-web-services2/build.gradle +++ b/sdks/java/io/amazon-web-services2/build.gradle @@ -59,8 +59,8 @@ dependencies { testCompile library.java.hamcrest_library testCompile library.java.powermock testCompile library.java.powermock_mockito + testCompile library.java.testcontainers_localstack testCompile "org.assertj:assertj-core:3.11.1" - testCompile 'org.testcontainers:testcontainers:1.11.3' testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/kinesis/build.gradle b/sdks/java/io/kinesis/build.gradle index 1f62346aab44..487ce569bace 100644 --- a/sdks/java/io/kinesis/build.gradle +++ b/sdks/java/io/kinesis/build.gradle @@ -49,8 +49,8 @@ dependencies { testCompile library.java.hamcrest_library testCompile library.java.powermock testCompile library.java.powermock_mockito + testCompile library.java.testcontainers_localstack testCompile "org.assertj:assertj-core:3.11.1" - testCompile 'org.testcontainers:localstack:1.11.2' testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java index 62f3e141bae5..ba18cd0d2831 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java @@ -421,7 +421,7 @@ public Read withAWSClientsProvider( *

The {@code serviceEndpoint} sets an alternative service host. This is useful to execute * the tests with Kinesis service emulator. * - *

The {@code veriftCertificate} disables or enables certificate verification. Never set it + *

The {@code verifyCertificate} disables or enables certificate verification. Never set it * to false in production. */ public Read withAWSClientsProvider( @@ -700,7 +700,7 @@ public Write withAWSClientsProvider( *

The {@code serviceEndpoint} sets an alternative service host. This is useful to execute * the tests with Kinesis service emulator. * - *

The {@code veriftCertificate} disables or enables certificate verification. Never set it + *

The {@code verifyCertificate} disables or enables certificate verification. Never set it * to false in production. */ public Write withAWSClientsProvider( diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java index e53a12d8f065..9daf33b662c9 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisIOIT.java @@ -29,7 +29,6 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.Random; -import java.util.UUID; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.common.TestRow; @@ -42,6 +41,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -62,32 +62,31 @@ public class KinesisIOIT implements Serializable { @Rule public TestPipeline pipelineWrite = TestPipeline.create(); @Rule public TestPipeline pipelineRead = TestPipeline.create(); - private static LocalStackContainer localstackContainer; - private static String streamName; - private static AmazonKinesis kinesisClient; - private static KinesisTestOptions options; + private static AmazonKinesis kinesisClient; + private static LocalStackContainer localstackContainer; + private static Instant now = Instant.now(); + @BeforeClass public static void setup() throws Exception { PipelineOptionsFactory.register(KinesisTestOptions.class); options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class); - if (doUseLocalstack()) { + if (options.getUseLocalstack()) { setupLocalstack(); + kinesisClient = createKinesisClient(); + createStream(options.getAwsKinesisStream()); } - kinesisClient = createKinesisClient(); - streamName = "beam_test_kinesis" + UUID.randomUUID(); - createStream(); } @AfterClass public static void teardown() { - if (doUseLocalstack()) { + if (options.getUseLocalstack()) { + kinesisClient.deleteStream(options.getAwsKinesisStream()); System.clearProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY); System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY); localstackContainer.stop(); } - kinesisClient.deleteStream(streamName); } /** Test which write and then read data for a Kinesis stream. */ @@ -106,7 +105,7 @@ private void runWrite() { .apply( "Write to Kinesis", KinesisIO.write() - .withStreamName(streamName) + .withStreamName(options.getAwsKinesisStream()) .withPartitioner(new RandomPartitioner()) .withAWSClientsProvider( options.getAwsAccessKey(), @@ -123,7 +122,7 @@ private void runRead() { PCollection output = pipelineRead.apply( KinesisIO.read() - .withStreamName(streamName) + .withStreamName(options.getAwsKinesisStream()) .withAWSClientsProvider( options.getAwsAccessKey(), options.getAwsSecretKey(), @@ -133,7 +132,8 @@ private void runRead() { .withMaxNumRecords(options.getNumberOfRecords()) // to prevent endless running in case of error .withMaxReadTime(Duration.standardMinutes(10L)) - .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) + .withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP) + .withInitialTimestampInStream(now) .withRequestRecordsLimit(1000)); PAssert.thatSingleton(output.apply("Count All", Count.globally())) @@ -152,6 +152,9 @@ private void runRead() { /** Necessary setup for localstack environment. */ private static void setupLocalstack() { + // For some unclear reason localstack requires a timestamp in seconds + now = Instant.ofEpochMilli(Long.divideUnsigned(now.getMillis(), 1000L)); + System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true"); @@ -201,7 +204,7 @@ private static AmazonKinesis createKinesisClient() { return clientBuilder.build(); } - private static void createStream() throws Exception { + private static void createStream(String streamName) throws Exception { kinesisClient.createStream(streamName, 1); int repeats = 10; for (int i = 0; i <= repeats; ++i) { @@ -217,20 +220,6 @@ private static void createStream() throws Exception { } } - /** Check whether pipeline options were provided. If not, use localstack container. */ - private static boolean doUseLocalstack() { - KinesisTestOptions defaults = PipelineOptionsFactory.fromArgs().as(KinesisTestOptions.class); - return defaults.getAwsAccessKey().equals(options.getAwsAccessKey()) - && defaults.getAwsSecretKey().equals(options.getAwsSecretKey()) - && defaults.getAwsKinesisStream().equals(options.getAwsKinesisStream()) - && defaults.getAwsKinesisRegion().equals(options.getAwsKinesisRegion()) - && defaults.getNumberOfShards().equals(options.getNumberOfShards()) - && defaults.getNumberOfRecords().equals(options.getNumberOfRecords()) - && (options.getAwsServiceEndpoint() == null - || options.getAwsServiceEndpoint().equals(defaults.getAwsServiceEndpoint())) - && defaults.getAwsVerifyCertificate().equals(options.getAwsVerifyCertificate()); - } - /** Produces test rows. */ private static class ConvertToBytes extends DoFn { @ProcessElement diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java index ff6ef924393f..fabdca3e3b4e 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java @@ -72,4 +72,10 @@ public interface KinesisTestOptions extends TestPipelineOptions { Integer getNumberOfRecords(); void setNumberOfRecords(Integer count); + + @Description("Use localstack. Disable to test with real Kinesis") + @Default.Boolean(true) + Boolean getUseLocalstack(); + + void setUseLocalstack(Boolean useLocalstack); }