From 8b1ce67c173c6aa06122ee1df5146639fd5c3e22 Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Mon, 27 Jul 2020 16:38:47 +0200 Subject: [PATCH 01/19] [BEAM-10138][BEAM-10137] Create kinesis expansion-service --- .../io/kinesis/expansion-service/build.gradle | 38 +++++++++++++++++++ settings.gradle | 1 + 2 files changed, 39 insertions(+) create mode 100644 sdks/java/io/kinesis/expansion-service/build.gradle diff --git a/sdks/java/io/kinesis/expansion-service/build.gradle b/sdks/java/io/kinesis/expansion-service/build.gradle new file mode 100644 index 000000000000..3315f24e0c8c --- /dev/null +++ b/sdks/java/io/kinesis/expansion-service/build.gradle @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +apply plugin: 'application' +mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" + +applyJavaNature( + enableChecker: true, + automaticModuleName: 'org.apache.beam.sdk.io.kinesis.expansion.service', + exportJavadoc: false, + validateShadowJar: false, + shadowClosure: {}, +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Kinesis :: Expansion Service" +ext.summary = "Expansion service serving KinesisIO" + +dependencies { + compile project(":sdks:java:expansion-service") + compile project(":sdks:java:io:kinesis") + runtime library.java.slf4j_jdk14 +} diff --git a/settings.gradle b/settings.gradle index 3d4bd3be7735..63dd10dbdd72 100644 --- a/settings.gradle +++ b/settings.gradle @@ -120,6 +120,7 @@ include ":sdks:java:io:jdbc" include ":sdks:java:io:jms" include ":sdks:java:io:kafka" include ":sdks:java:io:kinesis" +include ":sdks:java:io:kinesis:expansion-service" include ":sdks:java:io:kudu" include ":sdks:java:io:mongodb" include ":sdks:java:io:mqtt" From 63a2bfc712ed10b3d336b95f140ac9f2b6c0e9aa Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Mon, 27 Jul 2020 16:40:59 +0200 Subject: [PATCH 02/19] [BEAM-10138] Add Cross-language KinesisWrite external transform --- .../io/kinesis/KinesisTransformRegistrar.java | 114 ++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java new file mode 100644 index 000000000000..0cb7c66c4403 --- /dev/null +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import com.amazonaws.regions.Regions; +import com.google.auto.service.AutoService; +import java.util.Map; +import java.util.Properties; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** Exposes {@link KinesisIO.Write} as an external transform for cross-language usage. */ +@Experimental(Kind.PORTABILITY) +@AutoService(ExternalTransformRegistrar.class) +public class KinesisTransformRegistrar implements ExternalTransformRegistrar { + public static final String WRITE_URN = "beam:external:java:kinesis:write:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(WRITE_URN, WriteBuilder.class); + } + + private abstract static class CrossLanguageConfiguration { + String streamName; + String awsAccessKey; + String awsSecretKey; + Regions region; + @Nullable String serviceEndpoint; + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public void setAwsAccessKey(String awsAccessKey) { + this.awsAccessKey = awsAccessKey; + } + + public void setAwsSecretKey(String awsSecretKey) { + this.awsSecretKey = awsSecretKey; + } + + public void setRegion(String region) { + this.region = Regions.fromName(region); + } + + public void setServiceEndpoint(@Nullable String serviceEndpoint) { + this.serviceEndpoint = serviceEndpoint; + } + } + + @Experimental(Kind.PORTABILITY) + public static class WriteBuilder + implements ExternalTransformBuilder, PDone> { + + public static class Configuration extends CrossLanguageConfiguration { + private Properties producerProperties; + private String partitionKey; + + public void setProducerProperties(Iterable> producerProperties) { + if (producerProperties != null) { + Properties properties = new Properties(); + producerProperties.forEach(kv -> properties.setProperty(kv.getKey(), kv.getValue())); + this.producerProperties = properties; + } + } + + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } + } + + @Override + public PTransform, PDone> buildExternal(Configuration configuration) { + KinesisIO.Write writeTransform = + KinesisIO.write() + .withStreamName(configuration.streamName) + .withAWSClientsProvider( + configuration.awsAccessKey, + configuration.awsSecretKey, + configuration.region, + configuration.serviceEndpoint) + .withPartitionKey(configuration.partitionKey); + + if (configuration.producerProperties != null) { + writeTransform = writeTransform.withProducerProperties(configuration.producerProperties); + } + + return writeTransform; + } + } +} From 2e42af17518cb42ed75d5556b32311289f05c07b Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Mon, 27 Jul 2020 17:11:35 +0200 Subject: [PATCH 03/19] [BEAM-10137] Generify KinesisIO.Read --- .../apache/beam/sdk/io/kinesis/KinesisIO.java | 119 +++++++++++------- 1 file changed, 76 insertions(+), 43 deletions(-) diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java index ba18cd0d2831..ff7d23295b89 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java @@ -43,13 +43,18 @@ import java.util.function.Supplier; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.Read.Unbounded; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -295,14 +300,16 @@ public final class KinesisIO { private static final int DEFAULT_NUM_RETRIES = 6; /** Returns a new {@link Read} transform for reading from Kinesis. */ - public static Read read() { - return new AutoValue_KinesisIO_Read.Builder() - .setMaxNumRecords(Long.MAX_VALUE) - .setUpToDateThreshold(Duration.ZERO) - .setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy()) - .setRateLimitPolicyFactory(RateLimitPolicyFactory.withoutLimiter()) - .setMaxCapacityPerShard(ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD) - .build(); + public static Read read() { + return Read.newBuilder().setCoder(KinesisRecordCoder.of()).build(); + } + + /** + * A {@link PTransform} to read from Kinesis stream as bytes without metadata and returns a {@link + * PCollection} of {@link byte[]}. + */ + public static Read readData() { + return Read.newBuilder(KinesisRecord::getDataAsBytes).setCoder(ByteArrayCoder.of()).build(); } /** A {@link PTransform} writing data to Kinesis. */ @@ -312,7 +319,7 @@ public static Write write() { /** Implementation of {@link #read}. */ @AutoValue - public abstract static class Read extends PTransform> { + public abstract static class Read extends PTransform> { abstract @Nullable String getStreamName(); @@ -334,41 +341,63 @@ public abstract static class Read extends PTransform getCoder(); + + abstract @Nullable SerializableFunction getParseFn(); + + abstract Builder toBuilder(); + + static Builder newBuilder(SerializableFunction parseFn) { + return new AutoValue_KinesisIO_Read.Builder() + .setParseFn(parseFn) + .setMaxNumRecords(Long.MAX_VALUE) + .setUpToDateThreshold(Duration.ZERO) + .setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy()) + .setRateLimitPolicyFactory(RateLimitPolicyFactory.withoutLimiter()) + .setMaxCapacityPerShard(ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD); + } + + static Builder newBuilder() { + return newBuilder(x -> x); + } @AutoValue.Builder - abstract static class Builder { + abstract static class Builder { - abstract Builder setStreamName(String streamName); + abstract Builder setStreamName(String streamName); - abstract Builder setInitialPosition(StartingPoint startingPoint); + abstract Builder setInitialPosition(StartingPoint startingPoint); - abstract Builder setAWSClientsProvider(AWSClientsProvider clientProvider); + abstract Builder setAWSClientsProvider(AWSClientsProvider clientProvider); - abstract Builder setMaxNumRecords(long maxNumRecords); + abstract Builder setMaxNumRecords(long maxNumRecords); - abstract Builder setMaxReadTime(Duration maxReadTime); + abstract Builder setMaxReadTime(Duration maxReadTime); - abstract Builder setUpToDateThreshold(Duration upToDateThreshold); + abstract Builder setUpToDateThreshold(Duration upToDateThreshold); - abstract Builder setRequestRecordsLimit(Integer limit); + abstract Builder setRequestRecordsLimit(Integer limit); - abstract Builder setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory); + abstract Builder setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory); - abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory); + abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory); - abstract Builder setMaxCapacityPerShard(Integer maxCapacity); + abstract Builder setMaxCapacityPerShard(Integer maxCapacity); - abstract Read build(); + abstract Builder setParseFn(SerializableFunction parseFn); + + abstract Builder setCoder(Coder coder); + + abstract Read build(); } /** Specify reading from streamName. */ - public Read withStreamName(String streamName) { + public Read withStreamName(String streamName) { return toBuilder().setStreamName(streamName).build(); } /** Specify reading from some initial position in stream. */ - public Read withInitialPositionInStream(InitialPositionInStream initialPosition) { + public Read withInitialPositionInStream(InitialPositionInStream initialPosition) { return toBuilder().setInitialPosition(new StartingPoint(initialPosition)).build(); } @@ -376,7 +405,7 @@ public Read withInitialPositionInStream(InitialPositionInStream initialPosition) * Specify reading beginning at given {@link Instant}. This {@link Instant} must be in the past, * i.e. before {@link Instant#now()}. */ - public Read withInitialTimestampInStream(Instant initialTimestamp) { + public Read withInitialTimestampInStream(Instant initialTimestamp) { return toBuilder().setInitialPosition(new StartingPoint(initialTimestamp)).build(); } @@ -386,7 +415,7 @@ public Read withInitialTimestampInStream(Instant initialTimestamp) { * communication with Kinesis. You should use this method if {@link * Read#withAWSClientsProvider(String, String, Regions)} does not suit your needs. */ - public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) { + public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) { return toBuilder().setAWSClientsProvider(awsClientsProvider).build(); } @@ -395,7 +424,8 @@ public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) { * sophisticated credential protocol, then you should look at {@link * Read#withAWSClientsProvider(AWSClientsProvider)}. */ - public Read withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) { + public Read withAWSClientsProvider( + String awsAccessKey, String awsSecretKey, Regions region) { return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null); } @@ -407,7 +437,7 @@ public Read withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Reg *

The {@code serviceEndpoint} sets an alternative service host. This is useful to execute * the tests with a kinesis service emulator. */ - public Read withAWSClientsProvider( + public Read withAWSClientsProvider( String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) { return withAWSClientsProvider( new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint)); @@ -436,14 +466,14 @@ public Read withAWSClientsProvider( } /** Specifies to read at most a given number of records. */ - public Read withMaxNumRecords(long maxNumRecords) { + public Read withMaxNumRecords(long maxNumRecords) { checkArgument( maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords); return toBuilder().setMaxNumRecords(maxNumRecords).build(); } /** Specifies to read records during {@code maxReadTime}. */ - public Read withMaxReadTime(Duration maxReadTime) { + public Read withMaxReadTime(Duration maxReadTime) { checkArgument(maxReadTime != null, "maxReadTime can not be null"); return toBuilder().setMaxReadTime(maxReadTime).build(); } @@ -454,7 +484,7 @@ public Read withMaxReadTime(Duration maxReadTime) { * decide to scale the amount of resources allocated to the pipeline in order to speed up * ingestion. */ - public Read withUpToDateThreshold(Duration upToDateThreshold) { + public Read withUpToDateThreshold(Duration upToDateThreshold) { checkArgument(upToDateThreshold != null, "upToDateThreshold can not be null"); return toBuilder().setUpToDateThreshold(upToDateThreshold).build(); } @@ -465,14 +495,14 @@ public Read withUpToDateThreshold(Duration upToDateThreshold) { * prevent shard overloading. More details can be found here: API_GetRecords */ - public Read withRequestRecordsLimit(int limit) { + public Read withRequestRecordsLimit(int limit) { checkArgument(limit > 0, "limit must be positive, but was: %s", limit); checkArgument(limit <= 10_000, "limit must be up to 10,000, but was: %s", limit); return toBuilder().setRequestRecordsLimit(limit).build(); } /** Specifies the {@code WatermarkPolicyFactory} as ArrivalTimeWatermarkPolicyFactory. */ - public Read withArrivalTimeWatermarkPolicy() { + public Read withArrivalTimeWatermarkPolicy() { return toBuilder() .setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy()) .build(); @@ -484,7 +514,7 @@ public Read withArrivalTimeWatermarkPolicy() { *

{@param watermarkIdleDurationThreshold} Denotes the duration for which the watermark can * be idle. */ - public Read withArrivalTimeWatermarkPolicy(Duration watermarkIdleDurationThreshold) { + public Read withArrivalTimeWatermarkPolicy(Duration watermarkIdleDurationThreshold) { return toBuilder() .setWatermarkPolicyFactory( WatermarkPolicyFactory.withArrivalTimePolicy(watermarkIdleDurationThreshold)) @@ -492,7 +522,7 @@ public Read withArrivalTimeWatermarkPolicy(Duration watermarkIdleDurationThresho } /** Specifies the {@code WatermarkPolicyFactory} as ProcessingTimeWatermarkPolicyFactory. */ - public Read withProcessingTimeWatermarkPolicy() { + public Read withProcessingTimeWatermarkPolicy() { return toBuilder() .setWatermarkPolicyFactory(WatermarkPolicyFactory.withProcessingTimePolicy()) .build(); @@ -503,13 +533,13 @@ public Read withProcessingTimeWatermarkPolicy() { * * @param watermarkPolicyFactory Custom Watermark policy factory. */ - public Read withCustomWatermarkPolicy(WatermarkPolicyFactory watermarkPolicyFactory) { + public Read withCustomWatermarkPolicy(WatermarkPolicyFactory watermarkPolicyFactory) { checkArgument(watermarkPolicyFactory != null, "watermarkPolicyFactory cannot be null"); return toBuilder().setWatermarkPolicyFactory(watermarkPolicyFactory).build(); } /** Specifies a fixed delay rate limit policy with the default delay of 1 second. */ - public Read withFixedDelayRateLimitPolicy() { + public Read withFixedDelayRateLimitPolicy() { return toBuilder().setRateLimitPolicyFactory(RateLimitPolicyFactory.withFixedDelay()).build(); } @@ -518,7 +548,7 @@ public Read withFixedDelayRateLimitPolicy() { * * @param delay Denotes the fixed delay duration. */ - public Read withFixedDelayRateLimitPolicy(Duration delay) { + public Read withFixedDelayRateLimitPolicy(Duration delay) { checkArgument(delay != null, "delay cannot be null"); return toBuilder() .setRateLimitPolicyFactory(RateLimitPolicyFactory.withFixedDelay(delay)) @@ -532,7 +562,7 @@ public Read withFixedDelayRateLimitPolicy(Duration delay) { * * @param delay The function to invoke to get the next delay duration. */ - public Read withDynamicDelayRateLimitPolicy(Supplier delay) { + public Read withDynamicDelayRateLimitPolicy(Supplier delay) { checkArgument(delay != null, "delay cannot be null"); return toBuilder().setRateLimitPolicyFactory(RateLimitPolicyFactory.withDelay(delay)).build(); } @@ -542,19 +572,19 @@ public Read withDynamicDelayRateLimitPolicy(Supplier delay) { * * @param rateLimitPolicyFactory Custom rate limit policy factory. */ - public Read withCustomRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) { + public Read withCustomRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) { checkArgument(rateLimitPolicyFactory != null, "rateLimitPolicyFactory cannot be null"); return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build(); } /** Specifies the maximum number of messages per one shard. */ - public Read withMaxCapacityPerShard(Integer maxCapacity) { + public Read withMaxCapacityPerShard(Integer maxCapacity) { checkArgument(maxCapacity > 0, "maxCapacity must be positive, but was: %s", maxCapacity); return toBuilder().setMaxCapacityPerShard(maxCapacity).build(); } @Override - public PCollection expand(PBegin input) { + public PCollection expand(PBegin input) { Unbounded unbounded = org.apache.beam.sdk.io.Read.from( new KinesisSource( @@ -574,7 +604,10 @@ public PCollection expand(PBegin input) { unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords()); } - return input.apply(transform); + return input + .apply(transform) + .apply(MapElements.into(new TypeDescriptor() {}).via(getParseFn())) + .setCoder(getCoder()); } } From 7fc3dddb21ffd0d5bc9b0740be58cbbfacefa45a Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Mon, 27 Jul 2020 17:12:01 +0200 Subject: [PATCH 04/19] [BEAM-10137] Add Cross-language KinesisIO.Read external transform registrar --- .../io/kinesis/KinesisTransformRegistrar.java | 153 +++++++++++++++++- 1 file changed, 150 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java index 0cb7c66c4403..3fac626a8bb0 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.kinesis; import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.google.auto.service.AutoService; import java.util.Map; import java.util.Properties; @@ -28,19 +29,26 @@ import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; +import org.joda.time.Instant; -/** Exposes {@link KinesisIO.Write} as an external transform for cross-language usage. */ +/** + * Exposes {@link KinesisIO.Write} and {@link KinesisIO.Read} as an external transform for + * cross-language usage. + */ @Experimental(Kind.PORTABILITY) @AutoService(ExternalTransformRegistrar.class) public class KinesisTransformRegistrar implements ExternalTransformRegistrar { public static final String WRITE_URN = "beam:external:java:kinesis:write:v1"; + public static final String READ_DATA_URN = "beam:external:java:kinesis:read_data:v1"; @Override public Map> knownBuilders() { - return ImmutableMap.of(WRITE_URN, WriteBuilder.class); + return ImmutableMap.of(WRITE_URN, WriteBuilder.class, READ_DATA_URN, ReadDataBuilder.class); } private abstract static class CrossLanguageConfiguration { @@ -71,7 +79,6 @@ public void setServiceEndpoint(@Nullable String serviceEndpoint) { } } - @Experimental(Kind.PORTABILITY) public static class WriteBuilder implements ExternalTransformBuilder, PDone> { @@ -111,4 +118,144 @@ public PTransform, PDone> buildExternal(Configuration config return writeTransform; } } + + public static class ReadDataBuilder + implements ExternalTransformBuilder< + ReadDataBuilder.Configuration, PBegin, PCollection> { + + public static class Configuration extends CrossLanguageConfiguration { + @Nullable private Long maxNumRecords; + @Nullable private Duration maxReadTime; + @Nullable private InitialPositionInStream initialPositionInStream; + @Nullable private Instant initialTimestampInStream; + @Nullable private Integer requestRecordsLimit; + @Nullable private Duration upToDateThreshold; + @Nullable private Long maxCapacityPerShard; + @Nullable private WatermarkPolicy watermarkPolicy; + @Nullable private Duration watermarkIdleDurationThreshold; + @Nullable private Duration rateLimit; + + public void setMaxNumRecords(@Nullable Long maxNumRecords) { + this.maxNumRecords = maxNumRecords; + } + + public void setMaxReadTime(@Nullable Long maxReadTime) { + if (maxReadTime != null) { + this.maxReadTime = Duration.standardSeconds(maxReadTime); + } + } + + public void setInitialPositionInStream(@Nullable String initialPositionInStream) { + if (initialPositionInStream != null) { + this.initialPositionInStream = InitialPositionInStream.valueOf(initialPositionInStream); + } + } + + public void setInitialTimestampInStream(@Nullable Long initialTimestampInStream) { + if (initialTimestampInStream != null) { + this.initialTimestampInStream = Instant.ofEpochSecond(initialTimestampInStream); + } + } + + public void setRequestRecordsLimit(@Nullable Long requestRecordsLimit) { + if (requestRecordsLimit != null) { + this.requestRecordsLimit = requestRecordsLimit.intValue(); + } + } + + public void setUpToDateThreshold(@Nullable Long upToDateThreshold) { + if (upToDateThreshold != null) { + this.upToDateThreshold = Duration.standardSeconds(upToDateThreshold); + } + } + + public void setMaxCapacityPerShard(@Nullable Long maxCapacityPerShard) { + this.maxCapacityPerShard = maxCapacityPerShard; + } + + public void setWatermarkPolicy(@Nullable String watermarkPolicy) { + if (watermarkPolicy != null) { + this.watermarkPolicy = WatermarkPolicy.valueOf(watermarkPolicy); + } + } + + public void setWatermarkIdleDurationThreshold(@Nullable Long watermarkIdleDurationThreshold) { + if (watermarkIdleDurationThreshold != null) { + this.watermarkIdleDurationThreshold = + Duration.standardSeconds(watermarkIdleDurationThreshold); + } + } + + public void setRateLimit(@Nullable Long rateLimit) { + if (rateLimit != null) { + this.rateLimit = Duration.standardSeconds(rateLimit); + } + } + } + + private enum WatermarkPolicy { + ARRIVAL_TIME, + PROCESSING_TIME + } + + @Override + public PTransform> buildExternal( + ReadDataBuilder.Configuration configuration) { + KinesisIO.Read readTransform = + KinesisIO.readData() + .withStreamName(configuration.streamName) + .withAWSClientsProvider( + configuration.awsAccessKey, + configuration.awsSecretKey, + configuration.region, + configuration.serviceEndpoint); + + if (configuration.maxNumRecords != null) { + readTransform = readTransform.withMaxNumRecords(configuration.maxNumRecords); + } + if (configuration.upToDateThreshold != null) { + readTransform = readTransform.withUpToDateThreshold(configuration.upToDateThreshold); + } + if (configuration.maxCapacityPerShard != null) { + readTransform = + readTransform.withMaxCapacityPerShard(configuration.maxCapacityPerShard.intValue()); + } + if (configuration.watermarkPolicy != null) { + switch (configuration.watermarkPolicy) { + case ARRIVAL_TIME: + readTransform = + configuration.watermarkIdleDurationThreshold != null + ? readTransform.withArrivalTimeWatermarkPolicy( + configuration.watermarkIdleDurationThreshold) + : readTransform.withArrivalTimeWatermarkPolicy(); + break; + case PROCESSING_TIME: + readTransform = readTransform.withProcessingTimeWatermarkPolicy(); + break; + default: + throw new RuntimeException( + String.format( + "Unsupported watermark policy type: %s", configuration.watermarkPolicy)); + } + } + if (configuration.rateLimit != null) { + readTransform = readTransform.withFixedDelayRateLimitPolicy(configuration.rateLimit); + } + if (configuration.maxReadTime != null) { + readTransform = readTransform.withMaxReadTime(configuration.maxReadTime); + } + if (configuration.initialPositionInStream != null) { + readTransform = + readTransform.withInitialPositionInStream(configuration.initialPositionInStream); + } + if (configuration.requestRecordsLimit != null) { + readTransform = readTransform.withRequestRecordsLimit(configuration.requestRecordsLimit); + } + if (configuration.initialTimestampInStream != null) { + readTransform = + readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream); + } + return readTransform; + } + } } From 71bdd3099887602a7e4e6bc2e236b4967146b53b Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Fri, 24 Jul 2020 14:54:01 +0200 Subject: [PATCH 05/19] [BEAM-10137][BEAM-10138] Add Python wrapper for Cross-language Java KinesisIO --- sdks/python/apache_beam/io/kinesis.py | 308 ++++++++++++++++++++++++++ 1 file changed, 308 insertions(+) create mode 100644 sdks/python/apache_beam/io/kinesis.py diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py new file mode 100644 index 000000000000..7147bfbff611 --- /dev/null +++ b/sdks/python/apache_beam/io/kinesis.py @@ -0,0 +1,308 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PTransforms for supporting Kinesis streaming in Python pipelines. + + These transforms are currently supported by Beam Flink and Spark portable + runners. + + **Setup** + + Transforms provided in this module are cross-language transforms + implemented in the Beam Java SDK. During the pipeline construction, Python SDK + will connect to a Java expansion service to expand these transforms. + To facilitate this, a small amount of setup is needed before using these + transforms in a Beam Python pipeline. + + There are several ways to setup cross-language Kinesis transforms. + + * Option 1: use the default expansion service + * Option 2: specify a custom expansion service + + See below for details regarding each of these options. + + *Option 1: Use the default expansion service* + + This is the recommended and easiest setup option for using Python Kinesis + transforms. This option is only available for Beam 2.24.0 and later. + + This option requires following pre-requisites before running the Beam + pipeline. + + * Install Java runtime in the computer from where the pipeline is constructed + and make sure that 'java' command is available. + + In this option, Python SDK will either download (for released Beam version) or + build (when running from a Beam Git clone) a expansion service jar and use + that to expand transforms. Currently Kinesis transforms use the + 'beam-sdks-java-io-kinesis-expansion-service' jar for this purpose. + + *Option 2: specify a custom expansion service* + + In this option, you startup your own expansion service and provide that as + a parameter when using the transforms provided in this module. + + This option requires following pre-requisites before running the Beam + pipeline. + + * Startup your own expansion service. + * Update your pipeline to provide the expansion service address when + initiating Kinesis transforms provided in this module. + + Flink Users can use the built-in Expansion Service of the Flink Runner's + Job Server. If you start Flink's Job Server, the expansion service will be + started on port 8097. For a different address, please set the + expansion_service parameter. + + **More information** + + For more information regarding cross-language transforms see: + - https://beam.apache.org/roadmap/portability/ + + For more information specific to Flink runner see: + - https://beam.apache.org/documentation/runners/flink/ +""" + +# pytype: skip-file + +from __future__ import absolute_import + +import time +from typing import List +from typing import NamedTuple +from typing import Optional +from typing import Tuple + +from past.builtins import unicode + +from apache_beam import BeamJarExpansionService +from apache_beam import ExternalTransform +from apache_beam import NamedTupleBasedPayloadBuilder + +__all__ = [ + 'WriteToKinesis', + 'ReadDataFromKinesis', + 'InitialPositionInStream', + 'WatermarkPolicy', +] + + +def default_io_expansion_service(): + return BeamJarExpansionService( + 'sdks:java:io:kinesis:expansion-service:shadowJar') + + +WriteToKinesisSchema = NamedTuple( + 'WriteToKinesisSchema', + [ + ('stream_name', unicode), + ('aws_access_key', unicode), + ('aws_secret_key', unicode), + ('region', unicode), + ('partition_key', unicode), + ('service_endpoint', Optional[unicode]), + ('producer_properties', Optional[List[Tuple[unicode, unicode]]]), + ], +) + + +class WriteToKinesis(ExternalTransform): + """ + An external PTransform which writes byte array stream to Amazon Kinesis. + + Experimental; no backwards compatibility guarantees. + """ + URN = 'beam:external:java:kinesis:write:v1' + + def __init__( + self, + stream_name, + aws_access_key, + aws_secret_key, + region, + partition_key, + service_endpoint=None, + producer_properties=None, + expansion_service=None, + ): + """ + Initializes a write operation to Kinesis. + + :param stream_name: Kinesis stream name. + :param aws_access_key: Kinesis access key. + :param aws_secret_key: Kinesis access key secret. + :param region: AWS region. Example: 'us-east-1'. + :param service_endpoint: Kinesis service endpoint + :param partition_key: Specify default partition key. + :param producer_properties: Specify the configuration properties for Kinesis + Producer Library (KPL) as List[KV[string, string]]. + Example: [('CollectionMaxCount', '1000'), ('ConnectTimeout', '10000')] + :param expansion_service: The address (host:port) of the ExpansionService. + """ + super(WriteToKinesis, self).__init__( + self.URN, + NamedTupleBasedPayloadBuilder( + WriteToKinesisSchema( + stream_name=stream_name, + aws_access_key=aws_access_key, + aws_secret_key=aws_secret_key, + region=region, + partition_key=partition_key, + service_endpoint=service_endpoint, + producer_properties=list(producer_properties.items()) + if producer_properties else None, + )), + expansion_service or default_io_expansion_service(), + ) + + +ReadFromKinesisSchema = NamedTuple( + 'ReadFromKinesisSchema', + [ + ('stream_name', unicode), + ('aws_access_key', unicode), + ('aws_secret_key', unicode), + ('region', unicode), + ('service_endpoint', Optional[unicode]), + ('max_num_records', Optional[int]), + ('max_read_time', Optional[int]), + ('initial_position_in_stream', Optional[unicode]), + ('initial_timestamp_in_stream', Optional[int]), + ('request_records_limit', Optional[int]), + ('up_to_date_threshold', Optional[int]), + ('max_capacity_per_shard', Optional[int]), + ('watermark_policy', Optional[unicode]), + ('watermark_idle_duration_threshold', Optional[int]), + ('rate_limit', Optional[int]), + ], +) + + +class InitialPositionInStream: + LATEST = 'LATEST' + TRIM_HORIZON = 'TRIM_HORIZON' + AT_TIMESTAMP = 'AT_TIMESTAMP' + + +class WatermarkPolicy: + PROCESSING_TYPE = 'PROCESSING_TYPE' + ARRIVAL_TIME = 'ARRIVAL_TIME' + + +class ReadDataFromKinesis(ExternalTransform): + """ + An external PTransform which reads byte array stream from Amazon Kinesis. + + Experimental; no backwards compatibility guarantees. + """ + URN = 'beam:external:java:kinesis:read_data:v1' + + def __init__( + self, + stream_name, + aws_access_key, + aws_secret_key, + region, + service_endpoint=None, + max_num_records=None, + max_read_time=None, + initial_position_in_stream=None, + initial_timestamp_in_stream=None, + request_records_limit=None, + up_to_date_threshold=None, + max_capacity_per_shard=None, + watermark_policy=None, + watermark_idle_duration_threshold=None, + rate_limit=None, + expansion_service=None, + ): + """ + Initializes a read operation from Kinesis. + + :param stream_name: Kinesis stream name. + :param aws_access_key: Kinesis access key. + :param aws_secret_key: Kinesis access key secret. + :param region: AWS region. Example: 'us-east-1'. + :param service_endpoint: Kinesis service endpoint + :param max_num_records: Specifies to read at most a given number of records. + Must be greater than 0. + :param max_read_time: Specifies to read records during x seconds. + :param initial_timestamp_in_stream: Specify reading beginning at the given + timestamp in seconds. Must be in the past. + :param initial_position_in_stream: Specify reading from some initial + position in stream. Possible values: + LATEST - Start after the most recent data record (fetch new data). + TRIM_HORIZON - Start from the oldest available data record. + AT_TIMESTAMP - Start from the record at or after the specified + server-side timestamp. + :param request_records_limit: Specifies the maximum number of records in + GetRecordsResult returned by GetRecords call which is limited by 10K + records. If should be adjusted according to average size of data record + to prevent shard overloading. More at: + docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html + :param up_to_date_threshold: Specifies how late in seconds records consumed + by this source can be to still be considered on time. Defaults to zero. + :param max_capacity_per_shard: Specifies the maximum number of messages per + one shard. Defaults to 10'000. + :param watermark_policy: Specifies the watermark policy. Possible values: + PROCESSING_TYPE, ARRIVAL_TIME. Defaults to ARRIVAL_TIME. + :param watermark_idle_duration_threshold: Use only when watermark policy is + ARRIVAL_TIME. Denotes the duration for which the watermark can be idle. + Passed in seconds. + :param rate_limit: Sets fixed rate policy for given seconds value. By + default there is no rate limit. + :param expansion_service: The address (host:port) of the ExpansionService. + """ + if watermark_policy: + assert watermark_policy == WatermarkPolicy.ARRIVAL_TIME or\ + watermark_policy == WatermarkPolicy.PROCESSING_TYPE + + if initial_position_in_stream: + i = initial_position_in_stream + assert i == InitialPositionInStream.AT_TIMESTAMP or \ + i == InitialPositionInStream.LATEST or \ + i == InitialPositionInStream.TRIM_HORIZON + + if request_records_limit: + assert 0 < request_records_limit <= 10000 + + if initial_timestamp_in_stream: + assert initial_timestamp_in_stream < time.time() + + super(ReadDataFromKinesis, self).__init__( + self.URN, + NamedTupleBasedPayloadBuilder( + ReadFromKinesisSchema( + stream_name=stream_name, + aws_access_key=aws_access_key, + aws_secret_key=aws_secret_key, + region=region, + service_endpoint=service_endpoint, + max_num_records=max_num_records, + max_read_time=max_read_time, + initial_position_in_stream=initial_position_in_stream, + initial_timestamp_in_stream=initial_timestamp_in_stream, + request_records_limit=request_records_limit, + up_to_date_threshold=up_to_date_threshold, + max_capacity_per_shard=max_capacity_per_shard, + watermark_policy=watermark_policy, + watermark_idle_duration_threshold= + watermark_idle_duration_threshold, + rate_limit=rate_limit, + )), + expansion_service or default_io_expansion_service(), + ) From e31d47053dc94c6d3795e6a415bb14f895d94ec8 Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Fri, 24 Jul 2020 15:00:43 +0200 Subject: [PATCH 06/19] [BEAM-10137][BEAM-10138] Add integration tests for Python wrapper for Cross-language KinesisIO --- .../apache/beam/sdk/io/kinesis/KinesisIO.java | 2 +- .../io/kinesis/KinesisTransformRegistrar.java | 13 +- .../io/external/xlang_kinesisio_it_test.py | 304 ++++++++++++++++++ sdks/python/apache_beam/io/kinesis.py | 13 +- .../python/test-suites/portable/common.gradle | 2 + 5 files changed, 328 insertions(+), 6 deletions(-) create mode 100644 sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java index ff7d23295b89..5b958d1b9b37 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java @@ -454,7 +454,7 @@ public Read withAWSClientsProvider( *

The {@code verifyCertificate} disables or enables certificate verification. Never set it * to false in production. */ - public Read withAWSClientsProvider( + public Read withAWSClientsProvider( String awsAccessKey, String awsSecretKey, Regions region, diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java index 3fac626a8bb0..4b3b53f75235 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java @@ -47,7 +47,7 @@ public class KinesisTransformRegistrar implements ExternalTransformRegistrar { public static final String READ_DATA_URN = "beam:external:java:kinesis:read_data:v1"; @Override - public Map> knownBuilders() { + public Map>> knownBuilders() { return ImmutableMap.of(WRITE_URN, WriteBuilder.class, READ_DATA_URN, ReadDataBuilder.class); } @@ -57,6 +57,7 @@ private abstract static class CrossLanguageConfiguration { String awsSecretKey; Regions region; @Nullable String serviceEndpoint; + boolean verifyCertificate; public void setStreamName(String streamName) { this.streamName = streamName; @@ -77,6 +78,10 @@ public void setRegion(String region) { public void setServiceEndpoint(@Nullable String serviceEndpoint) { this.serviceEndpoint = serviceEndpoint; } + + public void setVerifyCertificate(@Nullable Boolean verifyCertificate) { + this.verifyCertificate = verifyCertificate == null ? true : verifyCertificate; + } } public static class WriteBuilder @@ -108,7 +113,8 @@ public PTransform, PDone> buildExternal(Configuration config configuration.awsAccessKey, configuration.awsSecretKey, configuration.region, - configuration.serviceEndpoint) + configuration.serviceEndpoint, + configuration.verifyCertificate) .withPartitionKey(configuration.partitionKey); if (configuration.producerProperties != null) { @@ -208,7 +214,8 @@ public PTransform> buildExternal( configuration.awsAccessKey, configuration.awsSecretKey, configuration.region, - configuration.serviceEndpoint); + configuration.serviceEndpoint, + configuration.verifyCertificate); if (configuration.maxNumRecords != null) { readTransform = readTransform.withMaxNumRecords(configuration.maxNumRecords); diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py new file mode 100644 index 000000000000..047204b4a2d6 --- /dev/null +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -0,0 +1,304 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Integration test for Python cross-language pipelines for Java KinesisIO. + +If you want to run the tests on localstack then run it just with pipeline +options. + +To test it on a real AWS account you need to pass some additional params, e.g.: +python setup.py nosetests \ +--tests=apache_beam.io.external.xlang_kinesisio_it_test \ +--test-pipeline-options=" + --use_real_aws + --aws_kinesis_stream= + --aws_access_key= + --aws_secret_key= + --aws_region= + --runner=FlinkRunner" +""" + +# pytype: skip-file + +from __future__ import absolute_import + +import argparse +import logging +import time +import unittest +import uuid + +import apache_beam as beam +from apache_beam.io.kinesis import InitialPositionInStream +from apache_beam.io.kinesis import ReadDataFromKinesis +from apache_beam.io.kinesis import WriteToKinesis +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + import boto3 +except ImportError: + boto3 = None + +try: + from testcontainers.core.container import DockerContainer +except ImportError: + DockerContainer = None +# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports + +LOCALSTACK_VERSION = '0.11.3' +NUM_RECORDS = 10 +NOW = time.time() +RECORD = b'record' + str(uuid.uuid4()).encode() + + +@unittest.skipIf(DockerContainer is None, 'testcontainers is not installed.') +@unittest.skipIf(boto3 is None, 'boto3 is not installed.') +@unittest.skipIf( + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner is + None, + 'Do not run this test on precommit suites.', +) +class CrossLanguageKinesisIOTest(unittest.TestCase): + def test_kinesis_io(self): + self.run_kinesis_write() + # TODO: remove once BEAM-10664 is resolved + if not self.use_localstack: + self.run_kinesis_read_data() + + def run_kinesis_write(self): + with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p: + p.not_use_test_runner_api = True + _ = ( + p + | 'Impulse' >> beam.Impulse() + | 'Generate' >> beam.FlatMap(lambda x: range(NUM_RECORDS)) # pylint: disable=range-builtin-not-iterating + | 'Map to bytes' >> + beam.Map(lambda x: RECORD + str(x).encode()).with_output_types(bytes) + | 'WriteToKinesis' >> WriteToKinesis( + stream_name=self.aws_kinesis_stream, + aws_access_key=self.aws_access_key, + aws_secret_key=self.aws_secret_key, + region=self.aws_region, + service_endpoint=self.aws_service_endpoint, + verify_certificate=(not self.use_localstack), + partition_key='1', + )) + + # TODO: Remove once BEAM-10664 is resolved + if self.use_localstack: + records = self.kinesis_helper.read_from_stream(self.aws_kinesis_stream) + self.assertEqual( + sorted(records), + sorted([RECORD + str(i).encode() for i in range(NUM_RECORDS)])) + + def run_kinesis_read_data(self): + records = [RECORD + str(i).encode() for i in range(NUM_RECORDS)] + + with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p: + result = ( + p + | 'ReadFromKinesis' >> ReadDataFromKinesis( + stream_name=self.aws_kinesis_stream, + aws_access_key=self.aws_access_key, + aws_secret_key=self.aws_secret_key, + region=self.aws_region, + service_endpoint=self.aws_service_endpoint, + verify_certificate=not self.use_localstack, + max_num_records=NUM_RECORDS, + max_read_time=300, # 5min + initial_position_in_stream=InitialPositionInStream.AT_TIMESTAMP, + initial_timestamp_in_stream=int(NOW), + ).with_output_types(bytes)) + assert_that(result, equal_to(records)) + + def set_localstack(self): + self.localstack = DockerContainer('localstack/localstack:{}' + .format(LOCALSTACK_VERSION))\ + .with_env('SERVICES', 'kinesis')\ + .with_env('KINESIS_PORT', '4568')\ + .with_env('USE_SSL', 'true')\ + .with_exposed_ports(4568)\ + .with_volume_mapping('/var/run/docker.sock', '/var/run/docker.sock', 'rw') + + # Repeat if ReadTimeout is raised. + for i in range(4): + try: + self.localstack.start() + break + except Exception as e: # pylint: disable=bare-except + if i == 3: + logging.error('Could not initialize localstack container') + raise e + + self.aws_service_endpoint = 'https://{}:{}'.format( + self.localstack.get_container_host_ip(), + self.localstack.get_exposed_port('4568'), + ) + + def setUp(self): + parser = argparse.ArgumentParser() + + parser.add_argument( + '--aws_kinesis_stream', + default='beam_kinesis_xlang', + help='Kinesis stream name', + ) + parser.add_argument( + '--aws_access_key', + default='accesskey', + help=('Aws access key'), + ) + parser.add_argument( + '--aws_secret_key', + default='secretkey', + help='Aws secret key', + ) + parser.add_argument( + '--aws_region', + default='us-east-1', + help='Aws region', + ) + parser.add_argument( + '--aws_service_endpoint', + default=None, + help='Url to external aws endpoint', + ) + parser.add_argument( + '--use_real_aws', + default=False, + dest='use_real_aws', + action='store_true', + help='Flag whether to use real aws for the tests purpose', + ) + parser.add_argument( + '--expansion_service', + help='Url to externally launched expansion service.', + ) + + pipeline = TestPipeline() + argv = pipeline.get_full_options_as_args() + + known_args, self.pipeline_args = parser.parse_known_args(argv) + + self.aws_kinesis_stream = known_args.aws_kinesis_stream + self.aws_access_key = known_args.aws_access_key + self.aws_secret_key = known_args.aws_secret_key + self.aws_region = known_args.aws_region + self.aws_service_endpoint = known_args.aws_service_endpoint + self.use_localstack = not known_args.use_real_aws + self.expansion_service = known_args.expansion_service + + if self.use_localstack: + self.set_localstack() + + self.kinesis_helper = KinesisHelper( + self.aws_access_key, + self.aws_secret_key, + self.aws_region, + self.aws_service_endpoint.replace('https', 'http') + if self.aws_service_endpoint else None, + ) + + if self.use_localstack: + self.kinesis_helper.create_stream(self.aws_kinesis_stream) + + def tearDown(self): + if self.use_localstack: + self.kinesis_helper.delete_stream(self.aws_kinesis_stream) + + try: + self.localstack.stop() + except: # pylint: disable=bare-except + logging.error('Could not stop the localstack container') + + +class KinesisHelper: + def __init__(self, access_key, secret_key, region, service_endpoint): + self.kinesis_client = boto3.client( + service_name='kinesis', + region_name=region, + endpoint_url=service_endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + + def create_stream(self, stream_name): + # localstack could not have initialized in the container yet so repeat + retries = 10 + for i in range(retries): + try: + self.kinesis_client.create_stream( + StreamName=stream_name, + ShardCount=1, + ) + time.sleep(2) + break + except: # pylint: disable=bare-except + if i == retries - 1: + logging.error('Could not create kinesis stream') + raise + + # Wait for the stream to be active + self.get_first_shard_id(stream_name) + + def delete_stream(self, stream_name): + self.kinesis_client.delete_stream( + StreamName=stream_name, + EnforceConsumerDeletion=True, + ) + + def get_first_shard_id(self, stream_name): + retries = 10 + stream = self.kinesis_client.describe_stream(StreamName=stream_name) + for i in range(retries): + if stream['StreamDescription']['StreamStatus'] == 'ACTIVE': + break + time.sleep(2) + if i == retries - 1: + logging.error('Could not initialize kinesis stream') + raise + stream = self.kinesis_client.describe_stream(StreamName=stream_name) + + return stream['StreamDescription']['Shards'][0]['ShardId'] + + def read_from_stream(self, stream_name): + shard_id = self.get_first_shard_id(stream_name) + + shard_iterator = self.kinesis_client.get_shard_iterator( + StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType=InitialPositionInStream.AT_TIMESTAMP, + Timestamp=str(NOW), + ) + + result = self.kinesis_client.get_records( + ShardIterator=shard_iterator['ShardIterator'], + Limit=NUM_RECORDS, + ) + + return [record['Data'] for record in result['Records']] + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 7147bfbff611..a56333edab63 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -115,6 +115,7 @@ def default_io_expansion_service(): ('region', unicode), ('partition_key', unicode), ('service_endpoint', Optional[unicode]), + ('verify_certificate', Optional[bool]), ('producer_properties', Optional[List[Tuple[unicode, unicode]]]), ], ) @@ -136,6 +137,7 @@ def __init__( region, partition_key, service_endpoint=None, + verify_certificate=None, producer_properties=None, expansion_service=None, ): @@ -147,6 +149,8 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint + :param verify_certificate: Enable or disable certificate verification. + Never set to False on production. True by default. :param partition_key: Specify default partition key. :param producer_properties: Specify the configuration properties for Kinesis Producer Library (KPL) as List[KV[string, string]]. @@ -163,8 +167,8 @@ def __init__( region=region, partition_key=partition_key, service_endpoint=service_endpoint, - producer_properties=list(producer_properties.items()) - if producer_properties else None, + verify_certificate=verify_certificate, + producer_properties=producer_properties, )), expansion_service or default_io_expansion_service(), ) @@ -178,6 +182,7 @@ def __init__( ('aws_secret_key', unicode), ('region', unicode), ('service_endpoint', Optional[unicode]), + ('verify_certificate', Optional[bool]), ('max_num_records', Optional[int]), ('max_read_time', Optional[int]), ('initial_position_in_stream', Optional[unicode]), @@ -218,6 +223,7 @@ def __init__( aws_secret_key, region, service_endpoint=None, + verify_certificate=None, max_num_records=None, max_read_time=None, initial_position_in_stream=None, @@ -238,6 +244,8 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint + :param verify_certificate: Enable or disable certificate verification. + Never set to False on production. True by default. :param max_num_records: Specifies to read at most a given number of records. Must be greater than 0. :param max_read_time: Specifies to read records during x seconds. @@ -292,6 +300,7 @@ def __init__( aws_secret_key=aws_secret_key, region=region, service_endpoint=service_endpoint, + verify_certificate=verify_certificate, max_num_records=max_num_records, max_read_time=max_read_time, initial_position_in_stream=initial_position_in_stream, diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index 1a9b8e221be5..3145e70e0f23 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -216,6 +216,7 @@ project.task("postCommitPy${pythonVersionSuffix}IT") { ':runners:flink:1.10:job-server:shadowJar', ':sdks:java:container:docker', ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar', + ':sdks:java:io:kinesis:expansion-service:shadowJar', ':sdks:java:extensions:schemaio-expansion-service:shadowJar', ] @@ -224,6 +225,7 @@ project.task("postCommitPy${pythonVersionSuffix}IT") { "apache_beam.io.gcp.bigquery_read_it_test", "apache_beam.io.external.xlang_jdbcio_it_test", "apache_beam.io.external.xlang_kafkaio_it_test", + "apache_beam.io.external.xlang_kinesisio_it_test", ] def testOpts = ["--tests=${tests.join(',')}"] def cmdArgs = mapToArgString([ From ab8735ae56f82a2dc4f11bd285d7b7c3b9b6ca0a Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Tue, 11 Aug 2020 09:56:38 +0200 Subject: [PATCH 07/19] Split the tests for localstack and real aws --- .../io/external/xlang_kinesisio_it_test.py | 42 ++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 047204b4a2d6..d6392ede4494 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -71,19 +71,30 @@ RECORD = b'record' + str(uuid.uuid4()).encode() -@unittest.skipIf(DockerContainer is None, 'testcontainers is not installed.') -@unittest.skipIf(boto3 is None, 'boto3 is not installed.') -@unittest.skipIf( - TestPipeline().get_pipeline_options().view_as(StandardOptions).runner is - None, - 'Do not run this test on precommit suites.', -) +@unittest.skipUnless(DockerContainer, 'testcontainers is not installed.') +@unittest.skipUnless(boto3, 'boto3 is not installed.') +@unittest.skipUnless( + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner, + 'Do not run this test on precommit suites.') class CrossLanguageKinesisIOTest(unittest.TestCase): - def test_kinesis_io(self): + @unittest.skipUnless( + TestPipeline().get_option('aws_kinesis_stream'), + 'Cannot test on real aws without pipeline options provided') + def test_kinesis_io_roundtrip(self): + # TODO: enable this test for localstack once BEAM-10664 is resolved self.run_kinesis_write() - # TODO: remove once BEAM-10664 is resolved - if not self.use_localstack: - self.run_kinesis_read_data() + self.run_kinesis_read() + + @unittest.skipIf( + TestPipeline().get_option('aws_kinesis_stream'), + 'Do not test on localstack when pipeline options were provided') + def test_kinesis_write(self): + # TODO: remove this test once BEAM-10664 is resolved + self.run_kinesis_write() + records = self.kinesis_helper.read_from_stream(self.aws_kinesis_stream) + self.assertEqual( + sorted(records), + sorted([RECORD + str(i).encode() for i in range(NUM_RECORDS)])) def run_kinesis_write(self): with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p: @@ -104,14 +115,7 @@ def run_kinesis_write(self): partition_key='1', )) - # TODO: Remove once BEAM-10664 is resolved - if self.use_localstack: - records = self.kinesis_helper.read_from_stream(self.aws_kinesis_stream) - self.assertEqual( - sorted(records), - sorted([RECORD + str(i).encode() for i in range(NUM_RECORDS)])) - - def run_kinesis_read_data(self): + def run_kinesis_read(self): records = [RECORD + str(i).encode() for i in range(NUM_RECORDS)] with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p: From 602e2f94231301635926c10bc866287874d616d8 Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Tue, 11 Aug 2020 09:57:03 +0200 Subject: [PATCH 08/19] Use dict instead of list[kv] --- sdks/python/apache_beam/io/kinesis.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index a56333edab63..f1630b023658 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -153,8 +153,8 @@ def __init__( Never set to False on production. True by default. :param partition_key: Specify default partition key. :param producer_properties: Specify the configuration properties for Kinesis - Producer Library (KPL) as List[KV[string, string]]. - Example: [('CollectionMaxCount', '1000'), ('ConnectTimeout', '10000')] + Producer Library (KPL) as dictionary. + Example: {'CollectionMaxCount': '1000', 'ConnectTimeout': '10000'} :param expansion_service: The address (host:port) of the ExpansionService. """ super(WriteToKinesis, self).__init__( @@ -168,7 +168,7 @@ def __init__( partition_key=partition_key, service_endpoint=service_endpoint, verify_certificate=verify_certificate, - producer_properties=producer_properties, + producer_properties=list(producer_properties.items()), )), expansion_service or default_io_expansion_service(), ) From ed5637acde271a45655e7bc55b0a85e0f0d202fe Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Tue, 11 Aug 2020 09:57:32 +0200 Subject: [PATCH 09/19] replace assert with warning when future timestamp provided --- sdks/python/apache_beam/io/kinesis.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index f1630b023658..63c73a75231b 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -81,6 +81,7 @@ from __future__ import absolute_import +import logging import time from typing import List from typing import NamedTuple @@ -288,8 +289,8 @@ def __init__( if request_records_limit: assert 0 < request_records_limit <= 10000 - if initial_timestamp_in_stream: - assert initial_timestamp_in_stream < time.time() + if initial_timestamp_in_stream and initial_position_in_stream < time.time(): + logging.warning('Provided timestamp emplaced not in the past.') super(ReadDataFromKinesis, self).__init__( self.URN, From c0ea87e5d1a5b3e359755becdce23c80870ac8a2 Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 12 Aug 2020 10:03:16 +0200 Subject: [PATCH 10/19] Change to checker Nullable --- .../apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java index 4b3b53f75235..54a77abf17f8 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java @@ -22,7 +22,6 @@ import com.google.auto.service.AutoService; import java.util.Map; import java.util.Properties; -import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; @@ -33,6 +32,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; From 6ac3fc541006eb19786ff42230c8c152e80587a3 Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 12 Aug 2020 10:08:30 +0200 Subject: [PATCH 11/19] Move enums to the end of file --- sdks/python/apache_beam/io/kinesis.py | 32 ++++++++++++++++++--------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 63c73a75231b..f1532a4fd8c5 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -198,17 +198,6 @@ def __init__( ) -class InitialPositionInStream: - LATEST = 'LATEST' - TRIM_HORIZON = 'TRIM_HORIZON' - AT_TIMESTAMP = 'AT_TIMESTAMP' - - -class WatermarkPolicy: - PROCESSING_TYPE = 'PROCESSING_TYPE' - ARRIVAL_TIME = 'ARRIVAL_TIME' - - class ReadDataFromKinesis(ExternalTransform): """ An external PTransform which reads byte array stream from Amazon Kinesis. @@ -316,3 +305,24 @@ def __init__( )), expansion_service or default_io_expansion_service(), ) + + +class InitialPositionInStream: + LATEST = 'LATEST' + TRIM_HORIZON = 'TRIM_HORIZON' + AT_TIMESTAMP = 'AT_TIMESTAMP' + + @staticmethod + def validate_param(param): + if param and not hasattr(InitialPositionInStream, param): + raise RuntimeError('Invalid initial position in stream: {}'.format(param)) + + +class WatermarkPolicy: + PROCESSING_TYPE = 'PROCESSING_TYPE' + ARRIVAL_TIME = 'ARRIVAL_TIME' + + @staticmethod + def validate_param(param): + if param and not hasattr(WatermarkPolicy, param): + raise RuntimeError('Invalid watermark policy: {}'.format(param)) From f25f2371d9803c2ca860ef2ca00ab75390c02739 Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 12 Aug 2020 10:10:17 +0200 Subject: [PATCH 12/19] Add Mapping instead of kv list --- .../beam/sdk/io/kinesis/KinesisTransformRegistrar.java | 4 ++-- .../apache_beam/io/external/xlang_kinesisio_it_test.py | 5 +++++ sdks/python/apache_beam/io/kinesis.py | 7 +++---- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java index 54a77abf17f8..6cdf0729816e 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java @@ -91,10 +91,10 @@ public static class Configuration extends CrossLanguageConfiguration { private Properties producerProperties; private String partitionKey; - public void setProducerProperties(Iterable> producerProperties) { + public void setProducerProperties(Map producerProperties) { if (producerProperties != null) { Properties properties = new Properties(); - producerProperties.forEach(kv -> properties.setProperty(kv.getKey(), kv.getValue())); + producerProperties.forEach(properties::setProperty); this.producerProperties = properties; } } diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index d6392ede4494..c4e7f93b3611 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -113,6 +113,7 @@ def run_kinesis_write(self): service_endpoint=self.aws_service_endpoint, verify_certificate=(not self.use_localstack), partition_key='1', + producer_properties=self.producer_properties, )) def run_kinesis_read(self): @@ -211,6 +212,10 @@ def setUp(self): self.aws_service_endpoint = known_args.aws_service_endpoint self.use_localstack = not known_args.use_real_aws self.expansion_service = known_args.expansion_service + self.producer_properties = { + 'CollectionMaxCount': str(NUM_RECORDS), + 'ConnectTimeout': str(MAX_READ_TIME), + } if self.use_localstack: self.set_localstack() diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index f1532a4fd8c5..626227b6eeab 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -83,10 +83,9 @@ import logging import time -from typing import List +from typing import Mapping from typing import NamedTuple from typing import Optional -from typing import Tuple from past.builtins import unicode @@ -117,7 +116,7 @@ def default_io_expansion_service(): ('partition_key', unicode), ('service_endpoint', Optional[unicode]), ('verify_certificate', Optional[bool]), - ('producer_properties', Optional[List[Tuple[unicode, unicode]]]), + ('producer_properties', Optional[Mapping[unicode, unicode]]), ], ) @@ -169,7 +168,7 @@ def __init__( partition_key=partition_key, service_endpoint=service_endpoint, verify_certificate=verify_certificate, - producer_properties=list(producer_properties.items()), + producer_properties=producer_properties, )), expansion_service or default_io_expansion_service(), ) From 5d87990d0dbfb132231edc93cfe2e021ae6e56bc Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 12 Aug 2020 10:11:18 +0200 Subject: [PATCH 13/19] Change seconds to millis --- .../io/kinesis/KinesisTransformRegistrar.java | 10 +++++----- .../io/external/xlang_kinesisio_it_test.py | 9 ++++++--- sdks/python/apache_beam/io/kinesis.py | 19 ++++++++++++------- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java index 6cdf0729816e..1a6e1b17d424 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java @@ -147,7 +147,7 @@ public void setMaxNumRecords(@Nullable Long maxNumRecords) { public void setMaxReadTime(@Nullable Long maxReadTime) { if (maxReadTime != null) { - this.maxReadTime = Duration.standardSeconds(maxReadTime); + this.maxReadTime = Duration.millis(maxReadTime); } } @@ -159,7 +159,7 @@ public void setInitialPositionInStream(@Nullable String initialPositionInStream) public void setInitialTimestampInStream(@Nullable Long initialTimestampInStream) { if (initialTimestampInStream != null) { - this.initialTimestampInStream = Instant.ofEpochSecond(initialTimestampInStream); + this.initialTimestampInStream = Instant.ofEpochMilli(initialTimestampInStream); } } @@ -171,7 +171,7 @@ public void setRequestRecordsLimit(@Nullable Long requestRecordsLimit) { public void setUpToDateThreshold(@Nullable Long upToDateThreshold) { if (upToDateThreshold != null) { - this.upToDateThreshold = Duration.standardSeconds(upToDateThreshold); + this.upToDateThreshold = Duration.millis(upToDateThreshold); } } @@ -188,13 +188,13 @@ public void setWatermarkPolicy(@Nullable String watermarkPolicy) { public void setWatermarkIdleDurationThreshold(@Nullable Long watermarkIdleDurationThreshold) { if (watermarkIdleDurationThreshold != null) { this.watermarkIdleDurationThreshold = - Duration.standardSeconds(watermarkIdleDurationThreshold); + Duration.millis(watermarkIdleDurationThreshold); } } public void setRateLimit(@Nullable Long rateLimit) { if (rateLimit != null) { - this.rateLimit = Duration.standardSeconds(rateLimit); + this.rateLimit = Duration.millis(rateLimit); } } } diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index c4e7f93b3611..850a528ef155 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -67,7 +67,10 @@ LOCALSTACK_VERSION = '0.11.3' NUM_RECORDS = 10 -NOW = time.time() +MAX_READ_TIME = 5 * 60 * 1000 # 5min +NOW_SECONDS = time.time() +NOW_MILLIS = NOW_SECONDS * 1000 +REQUEST_RECORDS_LIMIT = 1000 RECORD = b'record' + str(uuid.uuid4()).encode() @@ -132,7 +135,7 @@ def run_kinesis_read(self): max_num_records=NUM_RECORDS, max_read_time=300, # 5min initial_position_in_stream=InitialPositionInStream.AT_TIMESTAMP, - initial_timestamp_in_stream=int(NOW), + initial_timestamp_in_stream=NOW_MILLIS, ).with_output_types(bytes)) assert_that(result, equal_to(records)) @@ -297,7 +300,7 @@ def read_from_stream(self, stream_name): StreamName=stream_name, ShardId=shard_id, ShardIteratorType=InitialPositionInStream.AT_TIMESTAMP, - Timestamp=str(NOW), + Timestamp=str(NOW_SECONDS), ) result = self.kinesis_client.get_records( diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 626227b6eeab..01b4c9639a87 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -237,9 +237,9 @@ def __init__( Never set to False on production. True by default. :param max_num_records: Specifies to read at most a given number of records. Must be greater than 0. - :param max_read_time: Specifies to read records during x seconds. + :param max_read_time: Specifies to read records during x milliseconds. :param initial_timestamp_in_stream: Specify reading beginning at the given - timestamp in seconds. Must be in the past. + timestamp in milliseconds. Must be in the past. :param initial_position_in_stream: Specify reading from some initial position in stream. Possible values: LATEST - Start after the most recent data record (fetch new data). @@ -251,16 +251,17 @@ def __init__( records. If should be adjusted according to average size of data record to prevent shard overloading. More at: docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html - :param up_to_date_threshold: Specifies how late in seconds records consumed - by this source can be to still be considered on time. Defaults to zero. + :param up_to_date_threshold: Specifies how late in milliseconds records + consumed by this source can be to still be considered on time. Defaults + to zero. :param max_capacity_per_shard: Specifies the maximum number of messages per one shard. Defaults to 10'000. :param watermark_policy: Specifies the watermark policy. Possible values: PROCESSING_TYPE, ARRIVAL_TIME. Defaults to ARRIVAL_TIME. :param watermark_idle_duration_threshold: Use only when watermark policy is ARRIVAL_TIME. Denotes the duration for which the watermark can be idle. - Passed in seconds. - :param rate_limit: Sets fixed rate policy for given seconds value. By + Passed in milliseconds. + :param rate_limit: Sets fixed rate policy for given milliseconds value. By default there is no rate limit. :param expansion_service: The address (host:port) of the ExpansionService. """ @@ -277,7 +278,11 @@ def __init__( if request_records_limit: assert 0 < request_records_limit <= 10000 - if initial_timestamp_in_stream and initial_position_in_stream < time.time(): + initial_timestamp_in_stream = int( + initial_timestamp_in_stream) if initial_timestamp_in_stream else None + + if initial_timestamp_in_stream and initial_timestamp_in_stream < time.time( + ): logging.warning('Provided timestamp emplaced not in the past.') super(ReadDataFromKinesis, self).__init__( From 3ea7b96aeaa1ef5f53bb932ccafa02287a93b1fc Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 12 Aug 2020 10:12:18 +0200 Subject: [PATCH 14/19] Improve params validation --- sdks/python/apache_beam/io/kinesis.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index 01b4c9639a87..98ad4750e332 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -265,15 +265,11 @@ def __init__( default there is no rate limit. :param expansion_service: The address (host:port) of the ExpansionService. """ - if watermark_policy: - assert watermark_policy == WatermarkPolicy.ARRIVAL_TIME or\ - watermark_policy == WatermarkPolicy.PROCESSING_TYPE - - if initial_position_in_stream: - i = initial_position_in_stream - assert i == InitialPositionInStream.AT_TIMESTAMP or \ - i == InitialPositionInStream.LATEST or \ - i == InitialPositionInStream.TRIM_HORIZON + WatermarkPolicy.validate_param(watermark_policy) + InitialPositionInStream.validate_param(initial_position_in_stream) + + if watermark_idle_duration_threshold: + assert WatermarkPolicy.ARRIVAL_TIME == watermark_policy if request_records_limit: assert 0 < request_records_limit <= 10000 From 4125094001bb5e70950b5137a31d089ee154d97e Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 12 Aug 2020 10:12:46 +0200 Subject: [PATCH 15/19] Add more params to the test pipeline --- .../apache_beam/io/external/xlang_kinesisio_it_test.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 850a528ef155..05661aec8fd1 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -45,6 +45,7 @@ import apache_beam as beam from apache_beam.io.kinesis import InitialPositionInStream +from apache_beam.io.kinesis import WatermarkPolicy from apache_beam.io.kinesis import ReadDataFromKinesis from apache_beam.io.kinesis import WriteToKinesis from apache_beam.options.pipeline_options import PipelineOptions @@ -133,7 +134,10 @@ def run_kinesis_read(self): service_endpoint=self.aws_service_endpoint, verify_certificate=not self.use_localstack, max_num_records=NUM_RECORDS, - max_read_time=300, # 5min + max_read_time=MAX_READ_TIME, + request_records_limit=REQUEST_RECORDS_LIMIT, + watermark_policy=WatermarkPolicy.ARRIVAL_TIME, + watermark_idle_duration_threshold=MAX_READ_TIME, initial_position_in_stream=InitialPositionInStream.AT_TIMESTAMP, initial_timestamp_in_stream=NOW_MILLIS, ).with_output_types(bytes)) From 33cf24610a51c6db5e84f07776c5233de40f5fcf Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 12 Aug 2020 10:20:57 +0200 Subject: [PATCH 16/19] Change deprecated knownBuilders to knownBuilderInstances --- .../apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java index 1a6e1b17d424..b776f8993f0b 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java @@ -47,8 +47,8 @@ public class KinesisTransformRegistrar implements ExternalTransformRegistrar { public static final String READ_DATA_URN = "beam:external:java:kinesis:read_data:v1"; @Override - public Map>> knownBuilders() { - return ImmutableMap.of(WRITE_URN, WriteBuilder.class, READ_DATA_URN, ReadDataBuilder.class); + public Map> knownBuilderInstances() { + return ImmutableMap.of(WRITE_URN, new WriteBuilder(), READ_DATA_URN, new ReadDataBuilder()); } private abstract static class CrossLanguageConfiguration { From 16747d195c558425de0a50bc1a78fe57a5811433 Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 12 Aug 2020 10:21:30 +0200 Subject: [PATCH 17/19] Improve boolean expression --- .../apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java index b776f8993f0b..bdd04d8c044d 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java @@ -80,7 +80,7 @@ public void setServiceEndpoint(@Nullable String serviceEndpoint) { } public void setVerifyCertificate(@Nullable Boolean verifyCertificate) { - this.verifyCertificate = verifyCertificate == null ? true : verifyCertificate; + this.verifyCertificate = verifyCertificate == null || verifyCertificate; } } From a9345ef947f0ac30e9201600fcaab6a79299fdcf Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 12 Aug 2020 10:21:44 +0200 Subject: [PATCH 18/19] run spotless --- .../apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java index bdd04d8c044d..fe7863e05254 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; import org.apache.beam.sdk.transforms.ExternalTransformBuilder; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -187,8 +186,7 @@ public void setWatermarkPolicy(@Nullable String watermarkPolicy) { public void setWatermarkIdleDurationThreshold(@Nullable Long watermarkIdleDurationThreshold) { if (watermarkIdleDurationThreshold != null) { - this.watermarkIdleDurationThreshold = - Duration.millis(watermarkIdleDurationThreshold); + this.watermarkIdleDurationThreshold = Duration.millis(watermarkIdleDurationThreshold); } } From 42c62b16a5e379716c0a4a66af8fc08cbea0e0fb Mon Sep 17 00:00:00 2001 From: Piotr Szuberski Date: Wed, 12 Aug 2020 11:40:26 +0200 Subject: [PATCH 19/19] Fix linter --- sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 05661aec8fd1..3df7841fae69 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -45,8 +45,8 @@ import apache_beam as beam from apache_beam.io.kinesis import InitialPositionInStream -from apache_beam.io.kinesis import WatermarkPolicy from apache_beam.io.kinesis import ReadDataFromKinesis +from apache_beam.io.kinesis import WatermarkPolicy from apache_beam.io.kinesis import WriteToKinesis from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions