diff --git a/CHANGES.md b/CHANGES.md index 0d2936254c5e..e42fe6acebc5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -97,6 +97,7 @@ reading data by exporting to JSON files. This has small differences in behavior for Time and Date-related fields. See Pydoc for more information. * Add dispositions for SnowflakeIO.write ([BEAM-10343](https://issues.apache.org/jira/browse/BEAM-10343)) +* Add cross-language support to SnowflakeIO.Read([BEAM-9897](https://issues.apache.org/jira/browse/BEAM-9897)). ## New Features / Improvements diff --git a/sdks/java/io/snowflake/build.gradle b/sdks/java/io/snowflake/build.gradle index 32ad7af53ff7..0a179f588033 100644 --- a/sdks/java/io/snowflake/build.gradle +++ b/sdks/java/io/snowflake/build.gradle @@ -22,6 +22,7 @@ applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.snowflake') provideIntegrationTestingDependencies() enableJavaPerformanceTesting() + description = "Apache Beam :: SDKs :: Java :: IO :: Snowflake" ext.summary = "IO to read and write on Snowflake." dependencies { diff --git a/sdks/java/io/snowflake/expansion-service/build.gradle b/sdks/java/io/snowflake/expansion-service/build.gradle new file mode 100644 index 000000000000..8a6ea6c05f07 --- /dev/null +++ b/sdks/java/io/snowflake/expansion-service/build.gradle @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +apply plugin: 'application' +mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" + +applyJavaNature(enableChecker:false, + automaticModuleName: 'org.apache.beam.sdk.io.expansion.service', + exportJavadoc: false, + validateShadowJar: false, + shadowClosure: {}, +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Snowflake ::Expansion Service" + ext.summary = "Expansion service serving Snowflake IO" + +dependencies { + compile project(":sdks:java:expansion-service") + compile project(":sdks:java:io:snowflake") + runtime library.java.slf4j_jdk14 +} + diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java index 3876c2f10d71..2b45dc19338b 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java @@ -18,38 +18,52 @@ package org.apache.beam.sdk.io.snowflake.credentials; import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions; +import org.apache.beam.sdk.io.snowflake.crosslanguage.SnowflakeReadRegistrar; /** * Factory class for creating implementations of {@link SnowflakeCredentials} from {@link * SnowflakePipelineOptions}. */ public class SnowflakeCredentialsFactory { - public static SnowflakeCredentials of(SnowflakePipelineOptions options) { - if (oauthOptionsAvailable(options)) { - return new OAuthTokenSnowflakeCredentials(options.getOauthToken()); - } else if (usernamePasswordOptionsAvailable(options)) { - return new UsernamePasswordSnowflakeCredentials(options.getUsername(), options.getPassword()); - } else if (keyPairOptionsAvailable(options)) { + public static SnowflakeCredentials of(SnowflakePipelineOptions o) { + if (oauthOptionsAvailable(o.getOauthToken())) { + return new OAuthTokenSnowflakeCredentials(o.getOauthToken()); + } else if (usernamePasswordOptionsAvailable(o.getUsername(), o.getPassword())) { + return new UsernamePasswordSnowflakeCredentials(o.getUsername(), o.getPassword()); + } else if (keyPairOptionsAvailable( + o.getUsername(), o.getPrivateKeyPath(), o.getPrivateKeyPassphrase())) { return new KeyPairSnowflakeCredentials( - options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase()); + o.getUsername(), o.getPrivateKeyPath(), o.getPrivateKeyPassphrase()); } throw new RuntimeException("Can't get credentials from Options"); } - private static boolean oauthOptionsAvailable(SnowflakePipelineOptions options) { - return options.getOauthToken() != null && !options.getOauthToken().isEmpty(); + public static SnowflakeCredentials of(SnowflakeReadRegistrar.ReadConfiguration c) { + if (oauthOptionsAvailable(c.getOAuthToken())) { + return new OAuthTokenSnowflakeCredentials(c.getOAuthToken()); + } else if (usernamePasswordOptionsAvailable(c.getUsername(), c.getPassword())) { + return new UsernamePasswordSnowflakeCredentials(c.getUsername(), c.getPassword()); + } else if (keyPairOptionsAvailable( + c.getUsername(), c.getPrivateKeyPath(), c.getPrivateKeyPassphrase())) { + return new KeyPairSnowflakeCredentials( + c.getUsername(), c.getPrivateKeyPath(), c.getPrivateKeyPassphrase()); + } + throw new RuntimeException("Can't get credentials from Options"); + } + + private static boolean oauthOptionsAvailable(String token) { + return token != null && !token.isEmpty(); } - private static boolean usernamePasswordOptionsAvailable(SnowflakePipelineOptions options) { - return options.getUsername() != null - && !options.getUsername().isEmpty() - && !options.getPassword().isEmpty(); + private static boolean usernamePasswordOptionsAvailable(String username, String password) { + return username != null && !username.isEmpty() && !password.isEmpty(); } - private static boolean keyPairOptionsAvailable(SnowflakePipelineOptions options) { - return options.getUsername() != null - && !options.getUsername().isEmpty() - && !options.getPrivateKeyPath().isEmpty() - && !options.getPrivateKeyPassphrase().isEmpty(); + private static boolean keyPairOptionsAvailable( + String username, String privateKeyPath, String privateKeyPassphrase) { + return username != null + && !username.isEmpty() + && !privateKeyPath.isEmpty() + && !privateKeyPassphrase.isEmpty(); } } diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java new file mode 100644 index 000000000000..38162aea4fc0 --- /dev/null +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.snowflake.crosslanguage; + +/** Parameters abstract class to expose the transforms to an external SDK. */ +public abstract class Configuration { + private String serverName; + private String username; + private String password; + private String privateKeyPath; + private String privateKeyPassphrase; + private String oAuthToken; + private String database; + private String schema; + private String table; + private String query; + private String stagingBucketName; + private String storageIntegrationName; + + public String getServerName() { + return serverName; + } + + public void setServerName(String serverName) { + this.serverName = serverName; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getPrivateKeyPath() { + return privateKeyPath; + } + + public void setPrivateKeyPath(String privateKeyPath) { + this.privateKeyPath = privateKeyPath; + } + + public String getPrivateKeyPassphrase() { + return privateKeyPassphrase; + } + + public void setPrivateKeyPassphrase(String privateKeyPassphrase) { + this.privateKeyPassphrase = privateKeyPassphrase; + } + + public String getOAuthToken() { + return oAuthToken; + } + + public void setOAuthToken(String oAuthToken) { + this.oAuthToken = oAuthToken; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public String getQuery() { + return query; + } + + public void setQuery(String query) { + this.query = query; + } + + public String getStagingBucketName() { + return stagingBucketName; + } + + public void setStagingBucketName(String stagingBucketName) { + this.stagingBucketName = stagingBucketName; + } + + public String getStorageIntegrationName() { + return storageIntegrationName; + } + + public void setStorageIntegrationName(String storageIntegrationName) { + this.storageIntegrationName = storageIntegrationName; + } +} diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java new file mode 100644 index 000000000000..1e7be0f23f7b --- /dev/null +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.snowflake.crosslanguage; + +import com.google.auto.service.AutoService; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.Map; +import javax.sql.DataSource; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.snowflake.SnowflakeIO; +import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials; +import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** Exposes {@link SnowflakeIO.Read} as an external transform for cross-language usage. */ +@Experimental +@AutoService(ExternalTransformRegistrar.class) +public final class SnowflakeReadRegistrar implements ExternalTransformRegistrar { + + public static final String URN = "beam:external:java:snowflake:read:v1"; + + @Override + public Map>> knownBuilders() { + return ImmutableMap.of(URN, ReadBuilder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class ReadConfiguration extends Configuration {} + + public static class ReadBuilder + implements ExternalTransformBuilder> { + public ReadBuilder() {} + + @Override + public PTransform> buildExternal(ReadConfiguration c) { + SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c); + + SerializableFunction dataSourceSerializableFunction = + SnowflakeIO.DataSourceProviderFromDataSourceConfiguration.of( + SnowflakeIO.DataSourceConfiguration.create(credentials) + .withServerName(c.getServerName()) + .withDatabase(c.getDatabase()) + .withSchema(c.getSchema())); + + return SnowflakeIO.read() + .withStorageIntegrationName(c.getStorageIntegrationName()) + .withStagingBucketName(c.getStagingBucketName()) + .withDataSourceProviderFn(dataSourceSerializableFunction) + .withCsvMapper(CsvMapper.getCsvMapper()) + .withCoder(ByteArrayCoder.of()) + .fromTable(c.getTable()) + .fromQuery(c.getQuery()); + } + } + + private static class CsvMapper implements Serializable { + + public static SnowflakeIO.CsvMapper getCsvMapper() { + return (SnowflakeIO.CsvMapper) + parts -> { + String partsCSV = String.join(",", parts); + + return partsCSV.getBytes(Charset.defaultCharset()); + }; + } + } +} diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java new file mode 100644 index 000000000000..7e24ee94770c --- /dev/null +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Cross-language for SnowflakeIO. */ +@Experimental(Kind.PORTABILITY) +@DefaultAnnotation(NonNull.class) +package org.apache.beam.sdk.io.snowflake.crosslanguage; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; diff --git a/sdks/python/apache_beam/io/external/snowflake.py b/sdks/python/apache_beam/io/external/snowflake.py new file mode 100644 index 000000000000..e7ffa6a67036 --- /dev/null +++ b/sdks/python/apache_beam/io/external/snowflake.py @@ -0,0 +1,204 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Snowflake transforms tested against Flink portable runner. + + **Setup** + + Transforms provided in this module are cross-language transforms + implemented in the Beam Java SDK. During the pipeline construction, Python SDK + will connect to a Java expansion service to expand these transforms. + To facilitate this, a small amount of setup is needed before using these + transforms in a Beam Python pipeline. + + There are several ways to setup cross-language Snowflake transforms. + + * Option 1: use the default expansion service + * Option 2: specify a custom expansion service + + See below for details regarding each of these options. + + *Option 1: Use the default expansion service* + + This is the recommended and easiest setup option for using Python Snowflake + transforms.This option requires following pre-requisites + before running the Beam pipeline. + + * Install Java runtime in the computer from where the pipeline is constructed + and make sure that 'java' command is available. + + In this option, Python SDK will either download (for released Beam version) or + build (when running from a Beam Git clone) a expansion service jar and use + that to expand transforms. Currently Snowflake transforms use the + 'beam-sdks-java-io-expansion-service' jar for this purpose. + + *Option 2: specify a custom expansion service* + + In this option, you startup your own expansion service and provide that as + a parameter when using the transforms provided in this module. + + This option requires following pre-requisites before running the Beam + pipeline. + + * Startup your own expansion service. + * Update your pipeline to provide the expansion service address when + initiating Snowflake transforms provided in this module. + + Flink Users can use the built-in Expansion Service of the Flink Runner's + Job Server. If you start Flink's Job Server, the expansion service will be + started on port 8097. For a different address, please set the + expansion_service parameter. + + **More information** + + For more information regarding cross-language transforms see: + - https://beam.apache.org/roadmap/portability/ + + For more information specific to Flink runner see: + - https://beam.apache.org/documentation/runners/flink/ +""" + +# pytype: skip-file + +from __future__ import absolute_import + +import typing + +from past.builtins import unicode + +import apache_beam as beam +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +ReadFromSnowflakeSchema = typing.NamedTuple( + 'ReadFromSnowflakeSchema', + [ + ('server_name', unicode), + ('schema', unicode), + ('database', unicode), + ('staging_bucket_name', unicode), + ('storage_integration_name', unicode), + ('username', typing.Optional[unicode]), + ('password', typing.Optional[unicode]), + ('private_key_path', typing.Optional[unicode]), + ('private_key_passphrase', typing.Optional[unicode]), + ('o_auth_token', typing.Optional[unicode]), + ('table', typing.Optional[unicode]), + ('query', typing.Optional[unicode]), + ]) + + +def default_io_expansion_service(): + return BeamJarExpansionService( + 'sdks:java:io:snowflake:expansion-service:shadowJar') + + +class ReadFromSnowflake(beam.PTransform): + """ + An external PTransform which reads from Snowflake. + """ + + URN = 'beam:external:java:snowflake:read:v1' + + def __init__( + self, + server_name, + schema, + database, + staging_bucket_name, + storage_integration_name, + csv_mapper, + username=None, + password=None, + private_key_path=None, + private_key_passphrase=None, + o_auth_token=None, + table=None, + query=None, + expansion_service=None): + """ + Initializes a read operation from Snowflake. + + Required parameters: + + :param server_name: full Snowflake server name with the following format + account.region.gcp.snowflakecomputing.com. + :param schema: name of the Snowflake schema in the database to use. + :param database: name of the Snowflake database to use. + :param staging_bucket_name: name of the Google Cloud Storage bucket.:: + Bucket will be used as a temporary location for storing CSV files. + Those temporary directories will be named + 'sf_copy_csv_DATE_TIME_RANDOMSUFFIX' + and they will be removed automatically once Read operation finishes. + :param storage_integration_name: is the name of storage integration + object created according to Snowflake documentation. + :param csv_mapper: specifies a function which must translate + user-defined object to array of strings. + SnowflakeIO uses a COPY INTO statement to move data from + a Snowflake table to Google Cloud Storage as CSV files.These files + are then downloaded via FileIO and processed line by line. + Each line is split into an array of Strings using the OpenCSV + The csv_mapper function job is to give the user the possibility to + convert the array of Strings to a user-defined type, + ie. GenericRecord for Avro or Parquet files, or custom objects. + Example: + def csv_mapper(strings_array) + return User(strings_array[0], int(strings_array[1]))) + :param table: specifies a Snowflake table name. + :param query: specifies a Snowflake custom SQL query. + :param expansion_service: specifies URL of expansion service. + + Authentication parameters: + + :param username: specifies username for + username/password authentication method. + :param password: specifies password for + username/password authentication method. + :param private_key_path: specifies a private key file for + key/ pair authentication method. + :param private_key_passphrase: specifies password for + key/ pair authentication method. + :param o_auth_token: specifies access token for + OAuth authentication method. + """ + self.params = ReadFromSnowflakeSchema( + server_name=server_name, + schema=schema, + database=database, + staging_bucket_name=staging_bucket_name, + storage_integration_name=storage_integration_name, + username=username, + password=password, + private_key_path=private_key_path, + private_key_passphrase=private_key_passphrase, + o_auth_token=o_auth_token, + table=table, + query=query) + self.csv_mapper = csv_mapper + self.expansion_service = expansion_service or default_io_expansion_service() + + def expand(self, pbegin): + return ( + pbegin + | ExternalTransform( + self.URN, + NamedTupleBasedPayloadBuilder(self.params), + self.expansion_service, + ) + | 'CSV to array mapper' >> beam.Map(lambda csv: csv.split(b',')) + | 'CSV mapper' >> beam.Map(self.csv_mapper)) diff --git a/settings.gradle b/settings.gradle index 02208578af26..638c216b6231 100644 --- a/settings.gradle +++ b/settings.gradle @@ -120,6 +120,7 @@ include ":sdks:java:io:rabbitmq" include ":sdks:java:io:redis" include ":sdks:java:io:solr" include ":sdks:java:io:snowflake" +include ":sdks:java:io:snowflake:expansion-service" include ":sdks:java:io:splunk" include ":sdks:java:io:thrift" include ":sdks:java:io:tika" diff --git a/website/www/site/content/en/roadmap/connectors-multi-sdk.md b/website/www/site/content/en/roadmap/connectors-multi-sdk.md index 464ad83af4ce..3b13f59bc261 100644 --- a/website/www/site/content/en/roadmap/connectors-multi-sdk.md +++ b/website/www/site/content/en/roadmap/connectors-multi-sdk.md @@ -80,6 +80,7 @@ Ongoing and planned work related to making existing connectors/transforms availa * Java KafkaIO - completed - [BEAM-7029](https://issues.apache.org/jira/browse/BEAM-7029) * Java KinesisIO - In progress - [BEAM-10137](https://issues.apache.org/jira/browse/BEAM-10137), [BEAM-10138](https://issues.apache.org/jira/browse/BEAM-10138) * Java PubSubIO - In progress - [BEAM-7738](https://issues.apache.org/jira/browse/BEAM-7738) +* Java SnowflakeIO - In progress - [BEAM-9897](https://issues.apache.org/jira/browse/BEAM-9897), [BEAM-9898](https://issues.apache.org/jira/browse/BEAM-9898) * Java SpannerIO - In progress - [BEAM-10139](https://issues.apache.org/jira/browse/BEAM-10139), [BEAM-10140](https://issues.apache.org/jira/browse/BEAM-10140) * Java SQL - completed - [BEAM-8603](https://issues.apache.org/jira/browse/BEAM-8603)