From 37fe29336955c8df4c3faee88dc70ff18bc89135 Mon Sep 17 00:00:00 2001 From: "pawel.urbanowicz" Date: Tue, 30 Jun 2020 08:13:17 +0200 Subject: [PATCH 1/6] [BEAM-9897] improve credentials mechanism --- .../beam/sdk/io/snowflake/SnowflakeIO.java | 186 ++++++++++-------- .../KeyPairSnowflakeCredentials.java | 14 +- .../OAuthTokenSnowflakeCredentials.java | 7 + .../credentials/SnowflakeCredentials.java | 6 +- .../SnowflakeCredentialsFactory.java | 56 ++++-- .../UsernamePasswordSnowflakeCredentials.java | 7 + .../unit/DataSourceConfigurationTest.java | 6 - .../ExternalReadCredentialsTest.java | 73 +++++++ .../OAuthTokenSnowflakeCredentialsTest.java | 4 +- .../SnowflakeCredentialsFactoryTest.java | 3 +- ...rnamePasswordSnowflakeCredentialsTest.java | 4 +- 11 files changed, 245 insertions(+), 121 deletions(-) create mode 100644 sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/ExternalReadCredentialsTest.java diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java index 845f137d04bf..8d04c1a0a7d9 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.io.Serializable; import java.security.PrivateKey; -import java.sql.Connection; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -35,6 +34,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import javax.annotation.Nullable; import javax.sql.DataSource; import net.snowflake.client.jdbc.SnowflakeBasicDataSource; import org.apache.beam.sdk.annotations.Experimental; @@ -78,7 +78,6 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -232,22 +231,29 @@ public static Write write() { /** Implementation of {@link #read()}. */ @AutoValue public abstract static class Read extends PTransform> { + @Nullable + abstract SerializableFunction getDataSourceProviderFn(); - abstract @Nullable SerializableFunction getDataSourceProviderFn(); + @Nullable + abstract String getQuery(); - abstract @Nullable String getQuery(); + @Nullable + abstract String getTable(); - abstract @Nullable String getTable(); + @Nullable + abstract String getStorageIntegrationName(); - abstract @Nullable String getStorageIntegrationName(); + @Nullable + abstract String getStagingBucketName(); - abstract @Nullable String getStagingBucketName(); + @Nullable + abstract CsvMapper getCsvMapper(); - abstract @Nullable CsvMapper getCsvMapper(); + @Nullable + abstract Coder getCoder(); - abstract @Nullable Coder getCoder(); - - abstract @Nullable SnowflakeService getSnowflakeService(); + @Nullable + abstract SnowflakeService getSnowflakeService(); abstract Builder toBuilder(); @@ -512,28 +518,38 @@ public void populateDisplayData(DisplayData.Builder builder) { /** Implementation of {@link #write()}. */ @AutoValue public abstract static class Write extends PTransform, PDone> { + @Nullable + abstract SerializableFunction getDataSourceProviderFn(); - abstract @Nullable SerializableFunction getDataSourceProviderFn(); - - abstract @Nullable String getTable(); + @Nullable + abstract String getTable(); - abstract @Nullable String getStorageIntegrationName(); + @Nullable + abstract String getStorageIntegrationName(); - abstract @Nullable String getStagingBucketName(); + @Nullable + abstract String getStagingBucketName(); - abstract @Nullable String getQuery(); + @Nullable + abstract String getQuery(); - abstract @Nullable String getFileNameTemplate(); + @Nullable + abstract String getFileNameTemplate(); - abstract @Nullable WriteDisposition getWriteDisposition(); + @Nullable + abstract WriteDisposition getWriteDisposition(); - abstract @Nullable CreateDisposition getCreateDisposition(); + @Nullable + abstract CreateDisposition getCreateDisposition(); - abstract @Nullable SnowflakeTableSchema getTableSchema(); + @Nullable + abstract SnowflakeTableSchema getTableSchema(); - abstract @Nullable UserDataMapper getUserDataMapper(); + @Nullable + abstract UserDataMapper getUserDataMapper(); - abstract @Nullable SnowflakeService getSnowflakeService(); + @Nullable + abstract SnowflakeService getSnowflakeService(); abstract Builder toBuilder(); @@ -883,36 +899,47 @@ public void processElement(ProcessContext context) throws Exception { */ @AutoValue public abstract static class DataSourceConfiguration implements Serializable { + @Nullable + public abstract String getUrl(); - public abstract @Nullable String getUrl(); - - public abstract @Nullable String getUsername(); + @Nullable + public abstract String getUsername(); - public abstract @Nullable String getPassword(); + @Nullable + public abstract String getPassword(); - public abstract @Nullable PrivateKey getPrivateKey(); + @Nullable + public abstract PrivateKey getPrivateKey(); - public abstract @Nullable String getOauthToken(); + @Nullable + public abstract String getOauthToken(); - public abstract @Nullable String getDatabase(); + @Nullable + public abstract String getDatabase(); - public abstract @Nullable String getWarehouse(); + @Nullable + public abstract String getWarehouse(); - public abstract @Nullable String getSchema(); + @Nullable + public abstract String getSchema(); - public abstract @Nullable String getServerName(); + @Nullable + public abstract String getServerName(); - public abstract @Nullable Integer getPortNumber(); + @Nullable + public abstract Integer getPortNumber(); - public abstract @Nullable String getRole(); + @Nullable + public abstract String getRole(); - public abstract @Nullable Integer getLoginTimeout(); + @Nullable + public abstract Integer getLoginTimeout(); - public abstract @Nullable Boolean getSsl(); + @Nullable + public abstract Boolean getSsl(); - public abstract @Nullable Boolean getValidate(); - - public abstract @Nullable DataSource getDataSource(); + @Nullable + public abstract DataSource getDataSource(); abstract Builder builder(); @@ -944,8 +971,6 @@ abstract static class Builder { abstract Builder setSsl(Boolean ssl); - abstract Builder setValidate(Boolean validate); - abstract Builder setDataSource(DataSource dataSource); abstract DataSourceConfiguration build(); @@ -959,7 +984,6 @@ abstract static class Builder { public static DataSourceConfiguration create(DataSource dataSource) { checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable"); return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() - .setValidate(true) .setDataSource(dataSource) .build(); } @@ -970,26 +994,44 @@ public static DataSourceConfiguration create(DataSource dataSource) { * @param credentials - an instance of {@link SnowflakeCredentials}. */ public static DataSourceConfiguration create(SnowflakeCredentials credentials) { - if (credentials instanceof UsernamePasswordSnowflakeCredentials) { - return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() - .setValidate(true) - .setUsername(((UsernamePasswordSnowflakeCredentials) credentials).getUsername()) - .setPassword(((UsernamePasswordSnowflakeCredentials) credentials).getPassword()) - .build(); - } else if (credentials instanceof OAuthTokenSnowflakeCredentials) { - return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() - .setValidate(true) - .setOauthToken(((OAuthTokenSnowflakeCredentials) credentials).getToken()) - .build(); - } else if (credentials instanceof KeyPairSnowflakeCredentials) { - return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() - .setValidate(true) - .setUsername(((KeyPairSnowflakeCredentials) credentials).getUsername()) - .setPrivateKey(((KeyPairSnowflakeCredentials) credentials).getPrivateKey()) - .build(); - } - throw new IllegalArgumentException( - "Can't create DataSourceConfiguration from given credentials"); + return credentials.createSnowflakeDataSourceConfiguration(); + } + + /** + * Creates {@link DataSourceConfiguration} from instance of {@link + * UsernamePasswordSnowflakeCredentials}. + * + * @param credentials - an instance of {@link UsernamePasswordSnowflakeCredentials}. + */ + public static DataSourceConfiguration create(UsernamePasswordSnowflakeCredentials credentials) { + return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() + .setUsername(credentials.getUsername()) + .setPassword(credentials.getPassword()) + .build(); + } + + /** + * Creates {@link DataSourceConfiguration} from instance of {@link KeyPairSnowflakeCredentials}. + * + * @param credentials - an instance of {@link KeyPairSnowflakeCredentials}. + */ + public static DataSourceConfiguration create(KeyPairSnowflakeCredentials credentials) { + return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() + .setUsername(credentials.getUsername()) + .setPrivateKey(credentials.getPrivateKey()) + .build(); + } + + /** + * Creates {@link DataSourceConfiguration} from instance of {@link + * OAuthTokenSnowflakeCredentials}. + * + * @param credentials - an instance of {@link OAuthTokenSnowflakeCredentials}. + */ + public static DataSourceConfiguration create(OAuthTokenSnowflakeCredentials credentials) { + return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() + .setOauthToken(credentials.getToken()) + .build(); } /** @@ -1079,15 +1121,6 @@ public DataSourceConfiguration withLoginTimeout(Integer loginTimeout) { return builder().setLoginTimeout(loginTimeout).build(); } - /** - * Disables validation of connection parameters prior to pipeline submission. - * - * @return - */ - public DataSourceConfiguration withoutValidation() { - return builder().setValidate(false).build(); - } - void populateDisplayData(DisplayData.Builder builder) { if (getDataSource() != null) { builder.addIfNotNull(DisplayData.item("dataSource", getDataSource().getClass().getName())); @@ -1163,15 +1196,6 @@ public static class DataSourceProviderFromDataSourceConfiguration private final DataSourceConfiguration config; private DataSourceProviderFromDataSourceConfiguration(DataSourceConfiguration config) { - if (config.getValidate()) { - try { - Connection connection = config.buildDatasource().getConnection(); - connection.close(); - } catch (SQLException e) { - throw new IllegalArgumentException( - "Invalid DataSourceConfiguration. Underlying cause: " + e); - } - } this.config = config; } diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/KeyPairSnowflakeCredentials.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/KeyPairSnowflakeCredentials.java index 286ec628fc8b..ea6d2d19a1d3 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/KeyPairSnowflakeCredentials.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/KeyPairSnowflakeCredentials.java @@ -31,16 +31,17 @@ import javax.crypto.EncryptedPrivateKeyInfo; import javax.crypto.SecretKeyFactory; import javax.crypto.spec.PBEKeySpec; +import org.apache.beam.sdk.io.snowflake.SnowflakeIO; /** POJO for handling Key-Pair authentication against Snowflake. */ public class KeyPairSnowflakeCredentials implements SnowflakeCredentials { - private String username; - private PrivateKey privateKey; + private final String username; + private final PrivateKey privateKey; public KeyPairSnowflakeCredentials( - String username, String privateKeyPath, String privateKeyPassword) { + String username, String privateKeyPath, String privateKeyPassphrase) { this.username = username; - this.privateKey = getPrivateKey(privateKeyPath, privateKeyPassword); + this.privateKey = getPrivateKey(privateKeyPath, privateKeyPassphrase); } public KeyPairSnowflakeCredentials(String username, PrivateKey privateKey) { @@ -48,6 +49,11 @@ public KeyPairSnowflakeCredentials(String username, PrivateKey privateKey) { this.privateKey = privateKey; } + @Override + public SnowflakeIO.DataSourceConfiguration createSnowflakeDataSourceConfiguration() { + return SnowflakeIO.DataSourceConfiguration.create(this); + } + private PrivateKey getPrivateKey(String privateKeyPath, String privateKeyPassphrase) { try { byte[] keyBytes = Files.readAllBytes(Paths.get(privateKeyPath)); diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/OAuthTokenSnowflakeCredentials.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/OAuthTokenSnowflakeCredentials.java index be102a8b0f41..8f538f7cbcec 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/OAuthTokenSnowflakeCredentials.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/OAuthTokenSnowflakeCredentials.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.snowflake.credentials; +import org.apache.beam.sdk.io.snowflake.SnowflakeIO; + /** POJO for handling OAuth authentication against Snowflake, using pre-obtained OAuth token. */ public class OAuthTokenSnowflakeCredentials implements SnowflakeCredentials { private String token; @@ -28,4 +30,9 @@ public OAuthTokenSnowflakeCredentials(String token) { public String getToken() { return token; } + + @Override + public SnowflakeIO.DataSourceConfiguration createSnowflakeDataSourceConfiguration() { + return SnowflakeIO.DataSourceConfiguration.create(this); + } } diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentials.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentials.java index e3abf91f7d12..7e127c51c5e4 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentials.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentials.java @@ -17,8 +17,12 @@ */ package org.apache.beam.sdk.io.snowflake.credentials; +import org.apache.beam.sdk.io.snowflake.SnowflakeIO; + /** * Interface for holding credentials. Allows creating {@link * org.apache.beam.sdk.io.snowflake.SnowflakeIO.DataSourceConfiguration}. */ -public interface SnowflakeCredentials {} +public interface SnowflakeCredentials { + SnowflakeIO.DataSourceConfiguration createSnowflakeDataSourceConfiguration(); +} diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java index 3876c2f10d71..7dd6f0fca494 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java @@ -18,38 +18,52 @@ package org.apache.beam.sdk.io.snowflake.credentials; import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions; +import org.apache.beam.sdk.io.snowflake.xlang.Configuration; /** * Factory class for creating implementations of {@link SnowflakeCredentials} from {@link * SnowflakePipelineOptions}. */ public class SnowflakeCredentialsFactory { - public static SnowflakeCredentials of(SnowflakePipelineOptions options) { - if (oauthOptionsAvailable(options)) { - return new OAuthTokenSnowflakeCredentials(options.getOauthToken()); - } else if (usernamePasswordOptionsAvailable(options)) { - return new UsernamePasswordSnowflakeCredentials(options.getUsername(), options.getPassword()); - } else if (keyPairOptionsAvailable(options)) { - return new KeyPairSnowflakeCredentials( - options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase()); - } - throw new RuntimeException("Can't get credentials from Options"); + public static SnowflakeCredentials of(SnowflakePipelineOptions o) { + return createCredentials( + o.getOauthToken(), + o.getPrivateKeyPath(), + o.getPrivateKeyPassphrase(), + o.getUsername(), + o.getPassword()); } - private static boolean oauthOptionsAvailable(SnowflakePipelineOptions options) { - return options.getOauthToken() != null && !options.getOauthToken().isEmpty(); + public static SnowflakeCredentials createCredentials(Configuration c) { + return createCredentials( + c.getOAuthToken(), + c.getPrivateKeyPath(), + c.getPrivateKeyPassphrase(), + c.getUsername(), + c.getPassword()); } - private static boolean usernamePasswordOptionsAvailable(SnowflakePipelineOptions options) { - return options.getUsername() != null - && !options.getUsername().isEmpty() - && !options.getPassword().isEmpty(); + private static SnowflakeCredentials createCredentials( + String oAuth, + String privateKeyPath, + String privateKeyPassphrase, + String username, + String password) { + + if (isNotEmpty(oAuth)) { + return new OAuthTokenSnowflakeCredentials(oAuth); + } else if (isNotEmpty(privateKeyPath) + && isNotEmpty(username) + && isNotEmpty(privateKeyPassphrase)) { + return new KeyPairSnowflakeCredentials(username, privateKeyPath, privateKeyPassphrase); + } else if (isNotEmpty(username) && isNotEmpty(password)) { + return new UsernamePasswordSnowflakeCredentials(username, password); + } else { + throw new RuntimeException("Can't get credentials"); + } } - private static boolean keyPairOptionsAvailable(SnowflakePipelineOptions options) { - return options.getUsername() != null - && !options.getUsername().isEmpty() - && !options.getPrivateKeyPath().isEmpty() - && !options.getPrivateKeyPassphrase().isEmpty(); + private static boolean isNotEmpty(String s) { + return s != null && !s.isEmpty(); } } diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/UsernamePasswordSnowflakeCredentials.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/UsernamePasswordSnowflakeCredentials.java index 1d8bdce0cc2e..6e1cb321e695 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/UsernamePasswordSnowflakeCredentials.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/UsernamePasswordSnowflakeCredentials.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.snowflake.credentials; +import org.apache.beam.sdk.io.snowflake.SnowflakeIO; + /** POJO for handling Username & Password authentication against Snowflake. */ public class UsernamePasswordSnowflakeCredentials implements SnowflakeCredentials { private String username; @@ -34,4 +36,9 @@ public String getUsername() { public String getPassword() { return password; } + + @Override + public SnowflakeIO.DataSourceConfiguration createSnowflakeDataSourceConfiguration() { + return SnowflakeIO.DataSourceConfiguration.create(this); + } } diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java index b0400c8aafbe..cef1fc46c68c 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java @@ -114,12 +114,6 @@ public void testSettingStringLoginTimeout() { assertEquals(loginTimeout, configuration.getLoginTimeout()); } - @Test - public void testSettingValidate() { - configuration = configuration.withoutValidation(); - assertEquals(false, configuration.getValidate()); - } - @Test public void testDataSourceCreatedFromUrl() { String url = "jdbc:snowflake://account.snowflakecomputing.com"; diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/ExternalReadCredentialsTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/ExternalReadCredentialsTest.java new file mode 100644 index 000000000000..506266fe5d48 --- /dev/null +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/ExternalReadCredentialsTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.snowflake.test.unit.credentials; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +import org.apache.beam.sdk.io.snowflake.test.TestUtils; +import org.apache.beam.sdk.io.snowflake.xlang.ExternalRead; +import org.apache.beam.sdk.io.snowflake.xlang.ExternalRead.ReadConfiguration; +import org.junit.Test; + +public class ExternalReadCredentialsTest { + private static final String SERVER_NAME = "server_name.snowflakecomputing.com"; + private static final String DATABASE = "test_database"; + private static final String SCHEMA = "public"; + private static final String STAGING_BUCKET_NAME = "bucket/"; + + @Test + public void testBuildExternalTransformWithoutCredentials() { + ReadConfiguration configuration = createTestConfiguration(); + assertThrows( + RuntimeException.class, () -> new ExternalRead.ReadBuilder().buildExternal(configuration)); + } + + @Test + public void testBuildExternalTransformUsingOAuthToken() { + ReadConfiguration configuration = createTestConfiguration(); + configuration.setOAuthToken("token"); + assertNotNull(new ExternalRead.ReadBuilder().buildExternal(configuration)); + } + + @Test + public void testBuildExternalTransformUsingUsernameAndPassword() { + ReadConfiguration configuration = createTestConfiguration(); + configuration.setUsername("username"); + configuration.setPassword("password"); + assertNotNull(new ExternalRead.ReadBuilder().buildExternal(configuration)); + } + + @Test + public void testBuildExternalTransformUsingKeyPair() { + ReadConfiguration configuration = createTestConfiguration(); + configuration.setUsername("username"); + configuration.setPrivateKeyPath(TestUtils.getPrivateKeyPath(getClass())); + configuration.setPrivateKeyPassphrase(TestUtils.getPrivateKeyPassphrase()); + assertNotNull(new ExternalRead.ReadBuilder().buildExternal(configuration)); + } + + private ReadConfiguration createTestConfiguration() { + ReadConfiguration configuration = new ReadConfiguration(); + configuration.setServerName(SERVER_NAME); + configuration.setDatabase(DATABASE); + configuration.setSchema(SCHEMA); + configuration.setStagingBucketName(STAGING_BUCKET_NAME); + return configuration; + } +} diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/OAuthTokenSnowflakeCredentialsTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/OAuthTokenSnowflakeCredentialsTest.java index a1dee76f1601..8f3d774f397a 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/OAuthTokenSnowflakeCredentialsTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/OAuthTokenSnowflakeCredentialsTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.snowflake.test.unit.credentials; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.io.snowflake.SnowflakeIO; import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials; @@ -38,9 +37,8 @@ public void testBuildingDataSource() { OAuthTokenSnowflakeCredentials credentials = new OAuthTokenSnowflakeCredentials("token"); SnowflakeIO.DataSourceConfiguration configuration = - SnowflakeIO.DataSourceConfiguration.create(credentials); + credentials.createSnowflakeDataSourceConfiguration(); assertEquals(credentials.getToken(), configuration.getOauthToken()); - assertTrue(configuration.getValidate()); } } diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/SnowflakeCredentialsFactoryTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/SnowflakeCredentialsFactoryTest.java index f9f612d8f002..8e8d94d06c72 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/SnowflakeCredentialsFactoryTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/SnowflakeCredentialsFactoryTest.java @@ -56,7 +56,6 @@ public void oauthTokenTest() { @Test public void keyPairTest() { SnowflakePipelineOptions options = PipelineOptionsFactory.as(SnowflakePipelineOptions.class); - System.out.println(TestUtils.getPrivateKeyPath(getClass())); options.setUsername("username"); options.setPrivateKeyPath(TestUtils.getPrivateKeyPath(getClass())); options.setPrivateKeyPassphrase(TestUtils.getPrivateKeyPassphrase()); @@ -72,6 +71,6 @@ public void emptyOptionsTest() { Exception ex = assertThrows(RuntimeException.class, () -> SnowflakeCredentialsFactory.of(options)); - assertEquals("Can't get credentials from Options", ex.getMessage()); + assertEquals("Can't get credentials", ex.getMessage()); } } diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/UsernamePasswordSnowflakeCredentialsTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/UsernamePasswordSnowflakeCredentialsTest.java index 0c7503a079b8..e34dc57c2c33 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/UsernamePasswordSnowflakeCredentialsTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/UsernamePasswordSnowflakeCredentialsTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.snowflake.test.unit.credentials; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.io.snowflake.SnowflakeIO; import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials; @@ -41,10 +40,9 @@ public void testBuildingDataSource() { new UsernamePasswordSnowflakeCredentials("username", "password"); SnowflakeIO.DataSourceConfiguration configuration = - SnowflakeIO.DataSourceConfiguration.create(credentials); + credentials.createSnowflakeDataSourceConfiguration(); assertEquals("username", configuration.getUsername()); assertEquals("password", configuration.getPassword()); - assertTrue(configuration.getValidate()); } } From 0ad69cb2617d91903cb3c475cfefede8ace4782d Mon Sep 17 00:00:00 2001 From: "pawel.urbanowicz" Date: Tue, 30 Jun 2020 08:14:29 +0200 Subject: [PATCH 2/6] [BEAM-9897] add xlang support for SnowflakeIO.read --- CHANGES.md | 1 + sdks/java/container/boot.go | 1 + sdks/java/container/build.gradle | 3 + sdks/java/expansion-service/build.gradle | 2 + sdks/java/io/snowflake/build.gradle | 1 + .../SnowflakeCredentialsFactory.java | 16 +-- .../sdk/io/snowflake/xlang/Configuration.java | 130 +++++++++++++++++ .../sdk/io/snowflake/xlang/ExternalRead.java | 90 ++++++++++++ .../sdk/io/snowflake/xlang/package-info.java | 27 ++++ .../apache_beam/io/external/snowflake.py | 135 ++++++++++++++++++ .../en/roadmap/connectors-multi-sdk.md | 1 + 11 files changed, 398 insertions(+), 9 deletions(-) create mode 100644 sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/Configuration.java create mode 100644 sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/ExternalRead.java create mode 100644 sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/package-info.java create mode 100644 sdks/python/apache_beam/io/external/snowflake.py diff --git a/CHANGES.md b/CHANGES.md index 0d2936254c5e..e42fe6acebc5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -97,6 +97,7 @@ reading data by exporting to JSON files. This has small differences in behavior for Time and Date-related fields. See Pydoc for more information. * Add dispositions for SnowflakeIO.write ([BEAM-10343](https://issues.apache.org/jira/browse/BEAM-10343)) +* Add cross-language support to SnowflakeIO.Read([BEAM-9897](https://issues.apache.org/jira/browse/BEAM-9897)). ## New Features / Improvements diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 3787a437a4b8..81ec8f315d75 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -122,6 +122,7 @@ func main() { filepath.Join(jarsDir, "beam-sdks-java-harness.jar"), filepath.Join(jarsDir, "beam-sdks-java-io-kafka.jar"), filepath.Join(jarsDir, "kafka-clients.jar"), + filepath.Join(jarsDir, "beam-sdks-java-io-snowflake.jar"), } var hasWorkerExperiment = strings.Contains(options, "use_staged_dataflow_worker_jar") diff --git a/sdks/java/container/build.gradle b/sdks/java/container/build.gradle index 92a46b0b6e25..cfff685c63cd 100644 --- a/sdks/java/container/build.gradle +++ b/sdks/java/container/build.gradle @@ -49,6 +49,7 @@ dependencies { dockerDependency project(":sdks:java:io:kafka") // This dependency is set to 'provided' scope in :sdks:java:io:kafka dockerDependency library.java.kafka_clients + dockerDependency project(path: ":sdks:java:io:snowflake", configuration: "shadow") } def dockerfileName = project.findProperty('dockerfile') ?: 'Dockerfile' @@ -60,6 +61,8 @@ task copyDockerfileDependencies(type: Copy) { rename 'beam-sdks-java-harness-.*.jar', 'beam-sdks-java-harness.jar' rename 'beam-sdks-java-io-kafka.*.jar', 'beam-sdks-java-io-kafka.jar' rename 'kafka-clients.*.jar', 'kafka-clients.jar' + rename 'beam-sdks-java-io-snowflake.*SNAPSHOT.jar', 'beam-sdks-java-io-snowflake.jar' + into "build/target" } diff --git a/sdks/java/expansion-service/build.gradle b/sdks/java/expansion-service/build.gradle index f9f09bada9db..4e8a390de73f 100644 --- a/sdks/java/expansion-service/build.gradle +++ b/sdks/java/expansion-service/build.gradle @@ -44,6 +44,8 @@ dependencies { compile library.java.slf4j_api runtimeOnly library.java.slf4j_jdk14 testCompile library.java.junit + + runtime project(":sdks:java:io:snowflake") } task runExpansionService (type: JavaExec) { diff --git a/sdks/java/io/snowflake/build.gradle b/sdks/java/io/snowflake/build.gradle index 32ad7af53ff7..0a179f588033 100644 --- a/sdks/java/io/snowflake/build.gradle +++ b/sdks/java/io/snowflake/build.gradle @@ -22,6 +22,7 @@ applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.snowflake') provideIntegrationTestingDependencies() enableJavaPerformanceTesting() + description = "Apache Beam :: SDKs :: Java :: IO :: Snowflake" ext.summary = "IO to read and write on Snowflake." dependencies { diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java index 7dd6f0fca494..708ec7b36734 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.snowflake.credentials; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings.isNullOrEmpty; + import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions; import org.apache.beam.sdk.io.snowflake.xlang.Configuration; @@ -50,20 +52,16 @@ private static SnowflakeCredentials createCredentials( String username, String password) { - if (isNotEmpty(oAuth)) { + if (!isNullOrEmpty(oAuth)) { return new OAuthTokenSnowflakeCredentials(oAuth); - } else if (isNotEmpty(privateKeyPath) - && isNotEmpty(username) - && isNotEmpty(privateKeyPassphrase)) { + } else if (!isNullOrEmpty(privateKeyPath) + && !isNullOrEmpty(username) + && !isNullOrEmpty(privateKeyPassphrase)) { return new KeyPairSnowflakeCredentials(username, privateKeyPath, privateKeyPassphrase); - } else if (isNotEmpty(username) && isNotEmpty(password)) { + } else if (!isNullOrEmpty(username) && !isNullOrEmpty(password)) { return new UsernamePasswordSnowflakeCredentials(username, password); } else { throw new RuntimeException("Can't get credentials"); } } - - private static boolean isNotEmpty(String s) { - return s != null && !s.isEmpty(); - } } diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/Configuration.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/Configuration.java new file mode 100644 index 000000000000..998740d91df1 --- /dev/null +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/Configuration.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.snowflake.xlang; + +/** Parameters abstract class to expose the transforms to an external SDK. */ +public abstract class Configuration { + private String serverName; + private String username; + private String password; + private String privateKeyPath; + private String privateKeyPassphrase; + private String oAuthToken; + private String database; + private String schema; + private String table; + private String query; + private String stagingBucketName; + private String storageIntegrationName; + + public String getServerName() { + return serverName; + } + + public void setServerName(String serverName) { + this.serverName = serverName; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getPrivateKeyPath() { + return privateKeyPath; + } + + public void setPrivateKeyPath(String privateKeyPath) { + this.privateKeyPath = privateKeyPath; + } + + public String getPrivateKeyPassphrase() { + return privateKeyPassphrase; + } + + public void setPrivateKeyPassphrase(String privateKeyPassphrase) { + this.privateKeyPassphrase = privateKeyPassphrase; + } + + public String getOAuthToken() { + return oAuthToken; + } + + public void setOAuthToken(String oAuthToken) { + this.oAuthToken = oAuthToken; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public String getQuery() { + return query; + } + + public void setQuery(String query) { + this.query = query; + } + + public String getStagingBucketName() { + return stagingBucketName; + } + + public void setStagingBucketName(String stagingBucketName) { + this.stagingBucketName = stagingBucketName; + } + + public String getStorageIntegrationName() { + return storageIntegrationName; + } + + public void setStorageIntegrationName(String storageIntegrationName) { + this.storageIntegrationName = storageIntegrationName; + } +} diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/ExternalRead.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/ExternalRead.java new file mode 100644 index 000000000000..db645eb2d6b2 --- /dev/null +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/ExternalRead.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.snowflake.xlang; + +import com.google.auto.service.AutoService; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.util.Map; +import javax.sql.DataSource; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.snowflake.SnowflakeIO; +import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials; +import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** Exposes {@link SnowflakeIO.Read} as an external transform for cross-language usage. */ +@Experimental +@AutoService(ExternalTransformRegistrar.class) +public final class ExternalRead implements ExternalTransformRegistrar { + + public static final String URN = "beam:external:java:snowflake:read:v1"; + + @Override + public Map> knownBuilders() { + return ImmutableMap.of(URN, ReadBuilder.class); + } + + /** Parameters class to expose the transform to an external SDK. */ + public static class ReadConfiguration extends Configuration {} + + public static class ReadBuilder + implements ExternalTransformBuilder> { + public ReadBuilder() {} + + @Override + public PTransform> buildExternal(ReadConfiguration c) { + SnowflakeCredentials credentials = SnowflakeCredentialsFactory.createCredentials(c); + + SerializableFunction dataSourceSerializableFunction = + SnowflakeIO.DataSourceProviderFromDataSourceConfiguration.of( + SnowflakeIO.DataSourceConfiguration.create(credentials) + .withServerName(c.getServerName()) + .withDatabase(c.getDatabase()) + .withSchema(c.getSchema())); + + return SnowflakeIO.read() + .withStorageIntegrationName(c.getStorageIntegrationName()) + .withStagingBucketName(c.getStagingBucketName()) + .withDataSourceProviderFn(dataSourceSerializableFunction) + .withCsvMapper(CsvMapper.getCsvMapper()) + .withCoder(ByteArrayCoder.of()) + .fromTable(c.getTable()) + .fromQuery(c.getQuery()); + } + } + + private static class CsvMapper implements Serializable { + + public static SnowflakeIO.CsvMapper getCsvMapper() { + return (SnowflakeIO.CsvMapper) + parts -> { + String partsCSV = String.join(",", parts); + + return partsCSV.getBytes(Charset.defaultCharset()); + }; + } + } +} diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/package-info.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/package-info.java new file mode 100644 index 000000000000..25f1e7d9d677 --- /dev/null +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Cross-language for SnowflakeIO. */ +@Experimental(Kind.PORTABILITY) +@DefaultAnnotation(NonNull.class) +package org.apache.beam.sdk.io.snowflake.xlang; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; diff --git a/sdks/python/apache_beam/io/external/snowflake.py b/sdks/python/apache_beam/io/external/snowflake.py new file mode 100644 index 000000000000..6fc6a2c9e5c8 --- /dev/null +++ b/sdks/python/apache_beam/io/external/snowflake.py @@ -0,0 +1,135 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +from __future__ import absolute_import + +import typing + +from past.builtins import unicode + +import apache_beam as beam +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + +__all__ = ['ReadFromSnowflake'] + + +def default_io_expansion_service(): + return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar') + + +ReadFromSnowflakeSchema = typing.NamedTuple( + 'WriteToSnowflakeSchema', + [ + ('server_name', unicode), + ('schema', unicode), + ('database', unicode), + ('staging_bucket_name', unicode), + ('storage_integration_name', unicode), + ('username', typing.Optional[unicode]), + ('password', typing.Optional[unicode]), + ('private_key_path', typing.Optional[unicode]), + ('private_key_passphrase', typing.Optional[unicode]), + ('o_auth_token', typing.Optional[unicode]), + ('table', typing.Optional[unicode]), + ('query', typing.Optional[unicode]), + ]) + + +class ReadFromSnowflake(beam.PTransform): + """An external PTransform which reads from Snowflake.""" + + URN = 'beam:external:java:snowflake:read:v1' + + def __init__( + self, + server_name, + schema, + database, + staging_bucket_name, + storage_integration_name, + csv_mapper, + username=None, + password=None, + private_key_path=None, + private_key_passphrase=None, + o_auth_token=None, + table=None, + query=None, + expansion_service=None): + """ + Initializes a read operation from Snowflake. + + Required parameters: + :param server_name: full Snowflake server name with the following format account.region.gcp.snowflakecomputing.com. + :param schema: name of the Snowflake schema in the database to use. + :param database: name of the Snowflake database to use. + :param staging_bucket_name: name of the Google Cloud Storage bucket. Bucket will be used as a temporary location + for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` + and they will be removed automatically once Read operation finishes. + :param storage_integration_name: is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html) for the GCS bucket. + :param csv_mapper: specifies a function which must translate user-defined object to array of strings. + SnowflakeIO uses a [COPY INTO ](https://docs.snowflake.com/en/sql-reference/sql/copy-into-location.html) + statement to move data from a Snowflake table to Google Cloud Storage as CSV files. + These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) + and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. + The csv_mapper function job is to give the user the possibility to convert the array of Strings + to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects. + Example: + ``` + def csv_mapper(strings_array): + return User(strings_array[0], int(strings_array[1]))) + ``` + :param table or query: specifies a Snowflake table name or custom SQL query + :param expansion_service: specifies URL of expansion service. + + Authentication parameters: + It's required to pass one of the following combinations of valid parameters for authentication: + :param username and password: specifies username and password for username/password authentication method. + :param private_key_path and private_key_passphrase: specifies a private key file and password for key/ pair authentication method. + :param o_auth_token: specifies access token for OAuth authentication method. + """ + + self.params = ReadFromSnowflakeSchema( + server_name=server_name, + schema=schema, + database=database, + staging_bucket_name=staging_bucket_name, + storage_integration_name=storage_integration_name, + username=username, + password=password, + private_key_path=private_key_path, + private_key_passphrase=private_key_passphrase, + o_auth_token=o_auth_token, + table=table, + query=query) + self.csv_mapper = csv_mapper + self.expansion_service = expansion_service or default_io_expansion_service() + + def expand(self, pbegin): + return ( + pbegin + | ExternalTransform( + self.URN, + NamedTupleBasedPayloadBuilder(self.params), + self.expansion_service, + ) + | 'CSV to array mapper' >> beam.Map(lambda csv: csv.split(b',')) + | 'CSV mapper' >> beam.Map(self.csv_mapper)) diff --git a/website/www/site/content/en/roadmap/connectors-multi-sdk.md b/website/www/site/content/en/roadmap/connectors-multi-sdk.md index 464ad83af4ce..3b13f59bc261 100644 --- a/website/www/site/content/en/roadmap/connectors-multi-sdk.md +++ b/website/www/site/content/en/roadmap/connectors-multi-sdk.md @@ -80,6 +80,7 @@ Ongoing and planned work related to making existing connectors/transforms availa * Java KafkaIO - completed - [BEAM-7029](https://issues.apache.org/jira/browse/BEAM-7029) * Java KinesisIO - In progress - [BEAM-10137](https://issues.apache.org/jira/browse/BEAM-10137), [BEAM-10138](https://issues.apache.org/jira/browse/BEAM-10138) * Java PubSubIO - In progress - [BEAM-7738](https://issues.apache.org/jira/browse/BEAM-7738) +* Java SnowflakeIO - In progress - [BEAM-9897](https://issues.apache.org/jira/browse/BEAM-9897), [BEAM-9898](https://issues.apache.org/jira/browse/BEAM-9898) * Java SpannerIO - In progress - [BEAM-10139](https://issues.apache.org/jira/browse/BEAM-10139), [BEAM-10140](https://issues.apache.org/jira/browse/BEAM-10140) * Java SQL - completed - [BEAM-8603](https://issues.apache.org/jira/browse/BEAM-8603) From 535a9054e7af371ae7925fab1325d9bb01c1ae37 Mon Sep 17 00:00:00 2001 From: "pawel.urbanowicz" Date: Thu, 9 Jul 2020 08:38:09 +0200 Subject: [PATCH 3/6] fix: python lint --- .../apache_beam/io/external/snowflake.py | 49 +++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/io/external/snowflake.py b/sdks/python/apache_beam/io/external/snowflake.py index 6fc6a2c9e5c8..aa06bce033eb 100644 --- a/sdks/python/apache_beam/io/external/snowflake.py +++ b/sdks/python/apache_beam/io/external/snowflake.py @@ -78,32 +78,41 @@ def __init__( Initializes a read operation from Snowflake. Required parameters: - :param server_name: full Snowflake server name with the following format account.region.gcp.snowflakecomputing.com. + :param server_name: full Snowflake server name with the following format + account.region.gcp.snowflakecomputing.com. :param schema: name of the Snowflake schema in the database to use. :param database: name of the Snowflake database to use. - :param staging_bucket_name: name of the Google Cloud Storage bucket. Bucket will be used as a temporary location - for storing CSV files. Those temporary directories will be named `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` - and they will be removed automatically once Read operation finishes. - :param storage_integration_name: is the name of a Snowflake storage integration object created according to [Snowflake documentation](https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html) for the GCS bucket. - :param csv_mapper: specifies a function which must translate user-defined object to array of strings. - SnowflakeIO uses a [COPY INTO ](https://docs.snowflake.com/en/sql-reference/sql/copy-into-location.html) - statement to move data from a Snowflake table to Google Cloud Storage as CSV files. - These files are then downloaded via [FileIO](https://beam.apache.org/releases/javadoc/2.3.0/index.html?org/apache/beam/sdk/io/FileIO.html) - and processed line by line. Each line is split into an array of Strings using the [OpenCSV](http://opencsv.sourceforge.net/) library. - The csv_mapper function job is to give the user the possibility to convert the array of Strings - to a user-defined type, ie. GenericRecord for Avro or Parquet files, or custom objects. - Example: - ``` - def csv_mapper(strings_array): - return User(strings_array[0], int(strings_array[1]))) - ``` + :param staging_bucket_name: name of the Google Cloud Storage bucket. + Bucket will be used as a temporary location for storing CSV files. + Those temporary directories will be named + `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` + and they will be removed automatically once Read operation finishes. + :param storage_integration_name: is the name of storage integration + object created according to Snowflake documentation. + :param csv_mapper: specifies a function which must translate + user-defined object to array of strings. + SnowflakeIO uses a COPY INTO statement to + move data from a Snowflake table to Google Cloud Storage as CSV files. + These files are then downloaded via FileIO and processed line by line. + Each line is split into an array of Strings using the OpenCSV + The csv_mapper function job is to give the user the possibility to + convert the array of Strings to a user-defined type, + ie. GenericRecord for Avro or Parquet files, or custom objects. + Example: + ``` + def csv_mapper(strings_array): + return User(strings_array[0], int(strings_array[1]))) + ``` :param table or query: specifies a Snowflake table name or custom SQL query :param expansion_service: specifies URL of expansion service. Authentication parameters: - It's required to pass one of the following combinations of valid parameters for authentication: - :param username and password: specifies username and password for username/password authentication method. - :param private_key_path and private_key_passphrase: specifies a private key file and password for key/ pair authentication method. + It's required to pass one of the following combinations of valid parameters: + :param username and password: specifies username and password + for username/password authentication method. + :param private_key_path and private_key_passphrase: + specifies a private key file and password + for key/ pair authentication method. :param o_auth_token: specifies access token for OAuth authentication method. """ From f632a8a7a4447463180a8deda2f1d404b5865753 Mon Sep 17 00:00:00 2001 From: "pawel.urbanowicz" Date: Mon, 13 Jul 2020 10:26:40 +0200 Subject: [PATCH 4/6] refactor: revert auth mechanism and add missing docs --- sdks/java/container/boot.go | 1 - sdks/java/container/build.gradle | 3 - sdks/java/expansion-service/build.gradle | 2 - .../beam/sdk/io/snowflake/SnowflakeIO.java | 83 ++++++++++--------- .../KeyPairSnowflakeCredentials.java | 14 +--- .../OAuthTokenSnowflakeCredentials.java | 7 -- .../credentials/SnowflakeCredentials.java | 6 +- .../SnowflakeCredentialsFactory.java | 68 +++++++-------- .../UsernamePasswordSnowflakeCredentials.java | 7 -- .../Configuration.java | 2 +- .../SnowflakeReadRegistrar.java} | 6 +- .../package-info.java | 2 +- .../unit/DataSourceConfigurationTest.java | 6 ++ .../ExternalReadCredentialsTest.java | 73 ---------------- .../OAuthTokenSnowflakeCredentialsTest.java | 4 +- .../SnowflakeCredentialsFactoryTest.java | 3 +- ...rnamePasswordSnowflakeCredentialsTest.java | 4 +- .../apache_beam/io/external/snowflake.py | 41 +++++++++ 18 files changed, 145 insertions(+), 187 deletions(-) rename sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/{xlang => crosslanguage}/Configuration.java (98%) rename sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/{xlang/ExternalRead.java => crosslanguage/SnowflakeReadRegistrar.java} (96%) rename sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/{xlang => crosslanguage}/package-info.java (95%) delete mode 100644 sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/ExternalReadCredentialsTest.java diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 81ec8f315d75..3787a437a4b8 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -122,7 +122,6 @@ func main() { filepath.Join(jarsDir, "beam-sdks-java-harness.jar"), filepath.Join(jarsDir, "beam-sdks-java-io-kafka.jar"), filepath.Join(jarsDir, "kafka-clients.jar"), - filepath.Join(jarsDir, "beam-sdks-java-io-snowflake.jar"), } var hasWorkerExperiment = strings.Contains(options, "use_staged_dataflow_worker_jar") diff --git a/sdks/java/container/build.gradle b/sdks/java/container/build.gradle index cfff685c63cd..92a46b0b6e25 100644 --- a/sdks/java/container/build.gradle +++ b/sdks/java/container/build.gradle @@ -49,7 +49,6 @@ dependencies { dockerDependency project(":sdks:java:io:kafka") // This dependency is set to 'provided' scope in :sdks:java:io:kafka dockerDependency library.java.kafka_clients - dockerDependency project(path: ":sdks:java:io:snowflake", configuration: "shadow") } def dockerfileName = project.findProperty('dockerfile') ?: 'Dockerfile' @@ -61,8 +60,6 @@ task copyDockerfileDependencies(type: Copy) { rename 'beam-sdks-java-harness-.*.jar', 'beam-sdks-java-harness.jar' rename 'beam-sdks-java-io-kafka.*.jar', 'beam-sdks-java-io-kafka.jar' rename 'kafka-clients.*.jar', 'kafka-clients.jar' - rename 'beam-sdks-java-io-snowflake.*SNAPSHOT.jar', 'beam-sdks-java-io-snowflake.jar' - into "build/target" } diff --git a/sdks/java/expansion-service/build.gradle b/sdks/java/expansion-service/build.gradle index 4e8a390de73f..f9f09bada9db 100644 --- a/sdks/java/expansion-service/build.gradle +++ b/sdks/java/expansion-service/build.gradle @@ -44,8 +44,6 @@ dependencies { compile library.java.slf4j_api runtimeOnly library.java.slf4j_jdk14 testCompile library.java.junit - - runtime project(":sdks:java:io:snowflake") } task runExpansionService (type: JavaExec) { diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java index 8d04c1a0a7d9..61757325b463 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.Serializable; import java.security.PrivateKey; +import java.sql.Connection; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -938,6 +939,9 @@ public abstract static class DataSourceConfiguration implements Serializable { @Nullable public abstract Boolean getSsl(); + @Nullable + public abstract Boolean getValidate(); + @Nullable public abstract DataSource getDataSource(); @@ -971,6 +975,8 @@ abstract static class Builder { abstract Builder setSsl(Boolean ssl); + abstract Builder setValidate(Boolean validate); + abstract Builder setDataSource(DataSource dataSource); abstract DataSourceConfiguration build(); @@ -984,6 +990,7 @@ abstract static class Builder { public static DataSourceConfiguration create(DataSource dataSource) { checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable"); return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() + .setValidate(true) .setDataSource(dataSource) .build(); } @@ -994,44 +1001,26 @@ public static DataSourceConfiguration create(DataSource dataSource) { * @param credentials - an instance of {@link SnowflakeCredentials}. */ public static DataSourceConfiguration create(SnowflakeCredentials credentials) { - return credentials.createSnowflakeDataSourceConfiguration(); - } - - /** - * Creates {@link DataSourceConfiguration} from instance of {@link - * UsernamePasswordSnowflakeCredentials}. - * - * @param credentials - an instance of {@link UsernamePasswordSnowflakeCredentials}. - */ - public static DataSourceConfiguration create(UsernamePasswordSnowflakeCredentials credentials) { - return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() - .setUsername(credentials.getUsername()) - .setPassword(credentials.getPassword()) - .build(); - } - - /** - * Creates {@link DataSourceConfiguration} from instance of {@link KeyPairSnowflakeCredentials}. - * - * @param credentials - an instance of {@link KeyPairSnowflakeCredentials}. - */ - public static DataSourceConfiguration create(KeyPairSnowflakeCredentials credentials) { - return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() - .setUsername(credentials.getUsername()) - .setPrivateKey(credentials.getPrivateKey()) - .build(); - } - - /** - * Creates {@link DataSourceConfiguration} from instance of {@link - * OAuthTokenSnowflakeCredentials}. - * - * @param credentials - an instance of {@link OAuthTokenSnowflakeCredentials}. - */ - public static DataSourceConfiguration create(OAuthTokenSnowflakeCredentials credentials) { - return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() - .setOauthToken(credentials.getToken()) - .build(); + if (credentials instanceof UsernamePasswordSnowflakeCredentials) { + return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() + .setValidate(true) + .setUsername(((UsernamePasswordSnowflakeCredentials) credentials).getUsername()) + .setPassword(((UsernamePasswordSnowflakeCredentials) credentials).getPassword()) + .build(); + } else if (credentials instanceof OAuthTokenSnowflakeCredentials) { + return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() + .setValidate(true) + .setOauthToken(((OAuthTokenSnowflakeCredentials) credentials).getToken()) + .build(); + } else if (credentials instanceof KeyPairSnowflakeCredentials) { + return new AutoValue_SnowflakeIO_DataSourceConfiguration.Builder() + .setValidate(true) + .setUsername(((KeyPairSnowflakeCredentials) credentials).getUsername()) + .setPrivateKey(((KeyPairSnowflakeCredentials) credentials).getPrivateKey()) + .build(); + } + throw new IllegalArgumentException( + "Can't create DataSourceConfiguration from given credentials"); } /** @@ -1121,6 +1110,15 @@ public DataSourceConfiguration withLoginTimeout(Integer loginTimeout) { return builder().setLoginTimeout(loginTimeout).build(); } + /** + * Disables validation of connection parameters prior to pipeline submission. + * + * @return + */ + public DataSourceConfiguration withoutValidation() { + return builder().setValidate(false).build(); + } + void populateDisplayData(DisplayData.Builder builder) { if (getDataSource() != null) { builder.addIfNotNull(DisplayData.item("dataSource", getDataSource().getClass().getName())); @@ -1196,6 +1194,15 @@ public static class DataSourceProviderFromDataSourceConfiguration private final DataSourceConfiguration config; private DataSourceProviderFromDataSourceConfiguration(DataSourceConfiguration config) { + if (config.getValidate()) { + try { + Connection connection = config.buildDatasource().getConnection(); + connection.close(); + } catch (SQLException e) { + throw new IllegalArgumentException( + "Invalid DataSourceConfiguration. Underlying cause: " + e); + } + } this.config = config; } diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/KeyPairSnowflakeCredentials.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/KeyPairSnowflakeCredentials.java index ea6d2d19a1d3..286ec628fc8b 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/KeyPairSnowflakeCredentials.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/KeyPairSnowflakeCredentials.java @@ -31,17 +31,16 @@ import javax.crypto.EncryptedPrivateKeyInfo; import javax.crypto.SecretKeyFactory; import javax.crypto.spec.PBEKeySpec; -import org.apache.beam.sdk.io.snowflake.SnowflakeIO; /** POJO for handling Key-Pair authentication against Snowflake. */ public class KeyPairSnowflakeCredentials implements SnowflakeCredentials { - private final String username; - private final PrivateKey privateKey; + private String username; + private PrivateKey privateKey; public KeyPairSnowflakeCredentials( - String username, String privateKeyPath, String privateKeyPassphrase) { + String username, String privateKeyPath, String privateKeyPassword) { this.username = username; - this.privateKey = getPrivateKey(privateKeyPath, privateKeyPassphrase); + this.privateKey = getPrivateKey(privateKeyPath, privateKeyPassword); } public KeyPairSnowflakeCredentials(String username, PrivateKey privateKey) { @@ -49,11 +48,6 @@ public KeyPairSnowflakeCredentials(String username, PrivateKey privateKey) { this.privateKey = privateKey; } - @Override - public SnowflakeIO.DataSourceConfiguration createSnowflakeDataSourceConfiguration() { - return SnowflakeIO.DataSourceConfiguration.create(this); - } - private PrivateKey getPrivateKey(String privateKeyPath, String privateKeyPassphrase) { try { byte[] keyBytes = Files.readAllBytes(Paths.get(privateKeyPath)); diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/OAuthTokenSnowflakeCredentials.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/OAuthTokenSnowflakeCredentials.java index 8f538f7cbcec..be102a8b0f41 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/OAuthTokenSnowflakeCredentials.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/OAuthTokenSnowflakeCredentials.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.io.snowflake.credentials; -import org.apache.beam.sdk.io.snowflake.SnowflakeIO; - /** POJO for handling OAuth authentication against Snowflake, using pre-obtained OAuth token. */ public class OAuthTokenSnowflakeCredentials implements SnowflakeCredentials { private String token; @@ -30,9 +28,4 @@ public OAuthTokenSnowflakeCredentials(String token) { public String getToken() { return token; } - - @Override - public SnowflakeIO.DataSourceConfiguration createSnowflakeDataSourceConfiguration() { - return SnowflakeIO.DataSourceConfiguration.create(this); - } } diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentials.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentials.java index 7e127c51c5e4..e3abf91f7d12 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentials.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentials.java @@ -17,12 +17,8 @@ */ package org.apache.beam.sdk.io.snowflake.credentials; -import org.apache.beam.sdk.io.snowflake.SnowflakeIO; - /** * Interface for holding credentials. Allows creating {@link * org.apache.beam.sdk.io.snowflake.SnowflakeIO.DataSourceConfiguration}. */ -public interface SnowflakeCredentials { - SnowflakeIO.DataSourceConfiguration createSnowflakeDataSourceConfiguration(); -} +public interface SnowflakeCredentials {} diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java index 708ec7b36734..2b45dc19338b 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/SnowflakeCredentialsFactory.java @@ -17,10 +17,8 @@ */ package org.apache.beam.sdk.io.snowflake.credentials; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings.isNullOrEmpty; - import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions; -import org.apache.beam.sdk.io.snowflake.xlang.Configuration; +import org.apache.beam.sdk.io.snowflake.crosslanguage.SnowflakeReadRegistrar; /** * Factory class for creating implementations of {@link SnowflakeCredentials} from {@link @@ -28,40 +26,44 @@ */ public class SnowflakeCredentialsFactory { public static SnowflakeCredentials of(SnowflakePipelineOptions o) { - return createCredentials( - o.getOauthToken(), - o.getPrivateKeyPath(), - o.getPrivateKeyPassphrase(), - o.getUsername(), - o.getPassword()); + if (oauthOptionsAvailable(o.getOauthToken())) { + return new OAuthTokenSnowflakeCredentials(o.getOauthToken()); + } else if (usernamePasswordOptionsAvailable(o.getUsername(), o.getPassword())) { + return new UsernamePasswordSnowflakeCredentials(o.getUsername(), o.getPassword()); + } else if (keyPairOptionsAvailable( + o.getUsername(), o.getPrivateKeyPath(), o.getPrivateKeyPassphrase())) { + return new KeyPairSnowflakeCredentials( + o.getUsername(), o.getPrivateKeyPath(), o.getPrivateKeyPassphrase()); + } + throw new RuntimeException("Can't get credentials from Options"); } - public static SnowflakeCredentials createCredentials(Configuration c) { - return createCredentials( - c.getOAuthToken(), - c.getPrivateKeyPath(), - c.getPrivateKeyPassphrase(), - c.getUsername(), - c.getPassword()); + public static SnowflakeCredentials of(SnowflakeReadRegistrar.ReadConfiguration c) { + if (oauthOptionsAvailable(c.getOAuthToken())) { + return new OAuthTokenSnowflakeCredentials(c.getOAuthToken()); + } else if (usernamePasswordOptionsAvailable(c.getUsername(), c.getPassword())) { + return new UsernamePasswordSnowflakeCredentials(c.getUsername(), c.getPassword()); + } else if (keyPairOptionsAvailable( + c.getUsername(), c.getPrivateKeyPath(), c.getPrivateKeyPassphrase())) { + return new KeyPairSnowflakeCredentials( + c.getUsername(), c.getPrivateKeyPath(), c.getPrivateKeyPassphrase()); + } + throw new RuntimeException("Can't get credentials from Options"); } - private static SnowflakeCredentials createCredentials( - String oAuth, - String privateKeyPath, - String privateKeyPassphrase, - String username, - String password) { + private static boolean oauthOptionsAvailable(String token) { + return token != null && !token.isEmpty(); + } - if (!isNullOrEmpty(oAuth)) { - return new OAuthTokenSnowflakeCredentials(oAuth); - } else if (!isNullOrEmpty(privateKeyPath) - && !isNullOrEmpty(username) - && !isNullOrEmpty(privateKeyPassphrase)) { - return new KeyPairSnowflakeCredentials(username, privateKeyPath, privateKeyPassphrase); - } else if (!isNullOrEmpty(username) && !isNullOrEmpty(password)) { - return new UsernamePasswordSnowflakeCredentials(username, password); - } else { - throw new RuntimeException("Can't get credentials"); - } + private static boolean usernamePasswordOptionsAvailable(String username, String password) { + return username != null && !username.isEmpty() && !password.isEmpty(); + } + + private static boolean keyPairOptionsAvailable( + String username, String privateKeyPath, String privateKeyPassphrase) { + return username != null + && !username.isEmpty() + && !privateKeyPath.isEmpty() + && !privateKeyPassphrase.isEmpty(); } } diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/UsernamePasswordSnowflakeCredentials.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/UsernamePasswordSnowflakeCredentials.java index 6e1cb321e695..1d8bdce0cc2e 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/UsernamePasswordSnowflakeCredentials.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/credentials/UsernamePasswordSnowflakeCredentials.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.io.snowflake.credentials; -import org.apache.beam.sdk.io.snowflake.SnowflakeIO; - /** POJO for handling Username & Password authentication against Snowflake. */ public class UsernamePasswordSnowflakeCredentials implements SnowflakeCredentials { private String username; @@ -36,9 +34,4 @@ public String getUsername() { public String getPassword() { return password; } - - @Override - public SnowflakeIO.DataSourceConfiguration createSnowflakeDataSourceConfiguration() { - return SnowflakeIO.DataSourceConfiguration.create(this); - } } diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/Configuration.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java similarity index 98% rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/Configuration.java rename to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java index 998740d91df1..38162aea4fc0 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/Configuration.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/Configuration.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.snowflake.xlang; +package org.apache.beam.sdk.io.snowflake.crosslanguage; /** Parameters abstract class to expose the transforms to an external SDK. */ public abstract class Configuration { diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/ExternalRead.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java similarity index 96% rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/ExternalRead.java rename to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java index db645eb2d6b2..d8b51b048b12 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/ExternalRead.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.snowflake.xlang; +package org.apache.beam.sdk.io.snowflake.crosslanguage; import com.google.auto.service.AutoService; import java.io.Serializable; @@ -38,7 +38,7 @@ /** Exposes {@link SnowflakeIO.Read} as an external transform for cross-language usage. */ @Experimental @AutoService(ExternalTransformRegistrar.class) -public final class ExternalRead implements ExternalTransformRegistrar { +public final class SnowflakeReadRegistrar implements ExternalTransformRegistrar { public static final String URN = "beam:external:java:snowflake:read:v1"; @@ -56,7 +56,7 @@ public ReadBuilder() {} @Override public PTransform> buildExternal(ReadConfiguration c) { - SnowflakeCredentials credentials = SnowflakeCredentialsFactory.createCredentials(c); + SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c); SerializableFunction dataSourceSerializableFunction = SnowflakeIO.DataSourceProviderFromDataSourceConfiguration.of( diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/package-info.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java similarity index 95% rename from sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/package-info.java rename to sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java index 25f1e7d9d677..7e24ee94770c 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/xlang/package-info.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/package-info.java @@ -19,7 +19,7 @@ /** Cross-language for SnowflakeIO. */ @Experimental(Kind.PORTABILITY) @DefaultAnnotation(NonNull.class) -package org.apache.beam.sdk.io.snowflake.xlang; +package org.apache.beam.sdk.io.snowflake.crosslanguage; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java index cef1fc46c68c..b0400c8aafbe 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/DataSourceConfigurationTest.java @@ -114,6 +114,12 @@ public void testSettingStringLoginTimeout() { assertEquals(loginTimeout, configuration.getLoginTimeout()); } + @Test + public void testSettingValidate() { + configuration = configuration.withoutValidation(); + assertEquals(false, configuration.getValidate()); + } + @Test public void testDataSourceCreatedFromUrl() { String url = "jdbc:snowflake://account.snowflakecomputing.com"; diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/ExternalReadCredentialsTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/ExternalReadCredentialsTest.java deleted file mode 100644 index 506266fe5d48..000000000000 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/ExternalReadCredentialsTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.snowflake.test.unit.credentials; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; - -import org.apache.beam.sdk.io.snowflake.test.TestUtils; -import org.apache.beam.sdk.io.snowflake.xlang.ExternalRead; -import org.apache.beam.sdk.io.snowflake.xlang.ExternalRead.ReadConfiguration; -import org.junit.Test; - -public class ExternalReadCredentialsTest { - private static final String SERVER_NAME = "server_name.snowflakecomputing.com"; - private static final String DATABASE = "test_database"; - private static final String SCHEMA = "public"; - private static final String STAGING_BUCKET_NAME = "bucket/"; - - @Test - public void testBuildExternalTransformWithoutCredentials() { - ReadConfiguration configuration = createTestConfiguration(); - assertThrows( - RuntimeException.class, () -> new ExternalRead.ReadBuilder().buildExternal(configuration)); - } - - @Test - public void testBuildExternalTransformUsingOAuthToken() { - ReadConfiguration configuration = createTestConfiguration(); - configuration.setOAuthToken("token"); - assertNotNull(new ExternalRead.ReadBuilder().buildExternal(configuration)); - } - - @Test - public void testBuildExternalTransformUsingUsernameAndPassword() { - ReadConfiguration configuration = createTestConfiguration(); - configuration.setUsername("username"); - configuration.setPassword("password"); - assertNotNull(new ExternalRead.ReadBuilder().buildExternal(configuration)); - } - - @Test - public void testBuildExternalTransformUsingKeyPair() { - ReadConfiguration configuration = createTestConfiguration(); - configuration.setUsername("username"); - configuration.setPrivateKeyPath(TestUtils.getPrivateKeyPath(getClass())); - configuration.setPrivateKeyPassphrase(TestUtils.getPrivateKeyPassphrase()); - assertNotNull(new ExternalRead.ReadBuilder().buildExternal(configuration)); - } - - private ReadConfiguration createTestConfiguration() { - ReadConfiguration configuration = new ReadConfiguration(); - configuration.setServerName(SERVER_NAME); - configuration.setDatabase(DATABASE); - configuration.setSchema(SCHEMA); - configuration.setStagingBucketName(STAGING_BUCKET_NAME); - return configuration; - } -} diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/OAuthTokenSnowflakeCredentialsTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/OAuthTokenSnowflakeCredentialsTest.java index 8f3d774f397a..a1dee76f1601 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/OAuthTokenSnowflakeCredentialsTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/OAuthTokenSnowflakeCredentialsTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.snowflake.test.unit.credentials; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.io.snowflake.SnowflakeIO; import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials; @@ -37,8 +38,9 @@ public void testBuildingDataSource() { OAuthTokenSnowflakeCredentials credentials = new OAuthTokenSnowflakeCredentials("token"); SnowflakeIO.DataSourceConfiguration configuration = - credentials.createSnowflakeDataSourceConfiguration(); + SnowflakeIO.DataSourceConfiguration.create(credentials); assertEquals(credentials.getToken(), configuration.getOauthToken()); + assertTrue(configuration.getValidate()); } } diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/SnowflakeCredentialsFactoryTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/SnowflakeCredentialsFactoryTest.java index 8e8d94d06c72..f9f612d8f002 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/SnowflakeCredentialsFactoryTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/SnowflakeCredentialsFactoryTest.java @@ -56,6 +56,7 @@ public void oauthTokenTest() { @Test public void keyPairTest() { SnowflakePipelineOptions options = PipelineOptionsFactory.as(SnowflakePipelineOptions.class); + System.out.println(TestUtils.getPrivateKeyPath(getClass())); options.setUsername("username"); options.setPrivateKeyPath(TestUtils.getPrivateKeyPath(getClass())); options.setPrivateKeyPassphrase(TestUtils.getPrivateKeyPassphrase()); @@ -71,6 +72,6 @@ public void emptyOptionsTest() { Exception ex = assertThrows(RuntimeException.class, () -> SnowflakeCredentialsFactory.of(options)); - assertEquals("Can't get credentials", ex.getMessage()); + assertEquals("Can't get credentials from Options", ex.getMessage()); } } diff --git a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/UsernamePasswordSnowflakeCredentialsTest.java b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/UsernamePasswordSnowflakeCredentialsTest.java index e34dc57c2c33..0c7503a079b8 100644 --- a/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/UsernamePasswordSnowflakeCredentialsTest.java +++ b/sdks/java/io/snowflake/src/test/java/org/apache/beam/sdk/io/snowflake/test/unit/credentials/UsernamePasswordSnowflakeCredentialsTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.snowflake.test.unit.credentials; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.io.snowflake.SnowflakeIO; import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials; @@ -40,9 +41,10 @@ public void testBuildingDataSource() { new UsernamePasswordSnowflakeCredentials("username", "password"); SnowflakeIO.DataSourceConfiguration configuration = - credentials.createSnowflakeDataSourceConfiguration(); + SnowflakeIO.DataSourceConfiguration.create(credentials); assertEquals("username", configuration.getUsername()); assertEquals("password", configuration.getPassword()); + assertTrue(configuration.getValidate()); } } diff --git a/sdks/python/apache_beam/io/external/snowflake.py b/sdks/python/apache_beam/io/external/snowflake.py index aa06bce033eb..16cdca8e7476 100644 --- a/sdks/python/apache_beam/io/external/snowflake.py +++ b/sdks/python/apache_beam/io/external/snowflake.py @@ -28,6 +28,47 @@ from apache_beam.transforms.external import ExternalTransform from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder +"""Snowflake transforms tested against Flink portable runner. + **Setup** + Transforms provided in this module are cross-language transforms + implemented in the Beam Java SDK. During the pipeline construction, Python SDK + will connect to a Java expansion service to expand these transforms. + To facilitate this, a small amount of setup is needed before using these + transforms in a Beam Python pipeline. + There are several ways to setup cross-language Snowflake transforms. + * Option 1: use the default expansion service + * Option 2: specify a custom expansion service + See below for details regarding each of these options. + *Option 1: Use the default expansion service* + This is the recommended and easiest setup option for using Python Kafka + transforms. This option is only available for Beam 2.22.0 and later. + This option requires following pre-requisites before running the Beam + pipeline. + * Install Java runtime in the computer from where the pipeline is constructed + and make sure that 'java' command is available. + In this option, Python SDK will either download (for released Beam version) or + build (when running from a Beam Git clone) a expansion service jar and use + that to expand transforms. Currently Snowflake transforms use the + 'beam-sdks-java-io-expansion-service' jar for this purpose. + *Option 2: specify a custom expansion service* + In this option, you startup your own expansion service and provide that as + a parameter when using the transforms provided in this module. + This option requires following pre-requisites before running the Beam + pipeline. + * Startup your own expansion service. + * Update your pipeline to provide the expansion service address when + initiating Snowflake transforms provided in this module. + Flink Users can use the built-in Expansion Service of the Flink Runner's + Job Server. If you start Flink's Job Server, the expansion service will be + started on port 8097. For a different address, please set the + expansion_service parameter. + **More information** + For more information regarding cross-language transforms see: + - https://beam.apache.org/roadmap/portability/ + For more information specific to Flink runner see: + - https://beam.apache.org/documentation/runners/flink/ +""" + __all__ = ['ReadFromSnowflake'] From 5867b3a2beefb865ba729d0145ea357b864f951a Mon Sep 17 00:00:00 2001 From: "pawel.urbanowicz" Date: Tue, 28 Jul 2020 16:00:44 +0200 Subject: [PATCH 5/6] feat: add custom expansion-service --- .../snowflake/expansion-service/build.gradle | 38 ++++++ .../beam/sdk/io/snowflake/SnowflakeIO.java | 107 ++++++---------- .../apache_beam/io/external/snowflake.py | 116 +++++++++--------- settings.gradle | 1 + 4 files changed, 136 insertions(+), 126 deletions(-) create mode 100644 sdks/java/io/snowflake/expansion-service/build.gradle diff --git a/sdks/java/io/snowflake/expansion-service/build.gradle b/sdks/java/io/snowflake/expansion-service/build.gradle new file mode 100644 index 000000000000..8a6ea6c05f07 --- /dev/null +++ b/sdks/java/io/snowflake/expansion-service/build.gradle @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +apply plugin: 'application' +mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" + +applyJavaNature(enableChecker:false, + automaticModuleName: 'org.apache.beam.sdk.io.expansion.service', + exportJavadoc: false, + validateShadowJar: false, + shadowClosure: {}, +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Snowflake ::Expansion Service" + ext.summary = "Expansion service serving Snowflake IO" + +dependencies { + compile project(":sdks:java:expansion-service") + compile project(":sdks:java:io:snowflake") + runtime library.java.slf4j_jdk14 +} + diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java index 61757325b463..845f137d04bf 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java @@ -35,7 +35,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import javax.annotation.Nullable; import javax.sql.DataSource; import net.snowflake.client.jdbc.SnowflakeBasicDataSource; import org.apache.beam.sdk.annotations.Experimental; @@ -79,6 +78,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -232,29 +232,22 @@ public static Write write() { /** Implementation of {@link #read()}. */ @AutoValue public abstract static class Read extends PTransform> { - @Nullable - abstract SerializableFunction getDataSourceProviderFn(); - @Nullable - abstract String getQuery(); + abstract @Nullable SerializableFunction getDataSourceProviderFn(); - @Nullable - abstract String getTable(); + abstract @Nullable String getQuery(); - @Nullable - abstract String getStorageIntegrationName(); + abstract @Nullable String getTable(); - @Nullable - abstract String getStagingBucketName(); + abstract @Nullable String getStorageIntegrationName(); - @Nullable - abstract CsvMapper getCsvMapper(); + abstract @Nullable String getStagingBucketName(); - @Nullable - abstract Coder getCoder(); + abstract @Nullable CsvMapper getCsvMapper(); - @Nullable - abstract SnowflakeService getSnowflakeService(); + abstract @Nullable Coder getCoder(); + + abstract @Nullable SnowflakeService getSnowflakeService(); abstract Builder toBuilder(); @@ -519,38 +512,28 @@ public void populateDisplayData(DisplayData.Builder builder) { /** Implementation of {@link #write()}. */ @AutoValue public abstract static class Write extends PTransform, PDone> { - @Nullable - abstract SerializableFunction getDataSourceProviderFn(); - @Nullable - abstract String getTable(); + abstract @Nullable SerializableFunction getDataSourceProviderFn(); + + abstract @Nullable String getTable(); - @Nullable - abstract String getStorageIntegrationName(); + abstract @Nullable String getStorageIntegrationName(); - @Nullable - abstract String getStagingBucketName(); + abstract @Nullable String getStagingBucketName(); - @Nullable - abstract String getQuery(); + abstract @Nullable String getQuery(); - @Nullable - abstract String getFileNameTemplate(); + abstract @Nullable String getFileNameTemplate(); - @Nullable - abstract WriteDisposition getWriteDisposition(); + abstract @Nullable WriteDisposition getWriteDisposition(); - @Nullable - abstract CreateDisposition getCreateDisposition(); + abstract @Nullable CreateDisposition getCreateDisposition(); - @Nullable - abstract SnowflakeTableSchema getTableSchema(); + abstract @Nullable SnowflakeTableSchema getTableSchema(); - @Nullable - abstract UserDataMapper getUserDataMapper(); + abstract @Nullable UserDataMapper getUserDataMapper(); - @Nullable - abstract SnowflakeService getSnowflakeService(); + abstract @Nullable SnowflakeService getSnowflakeService(); abstract Builder toBuilder(); @@ -900,50 +883,36 @@ public void processElement(ProcessContext context) throws Exception { */ @AutoValue public abstract static class DataSourceConfiguration implements Serializable { - @Nullable - public abstract String getUrl(); - @Nullable - public abstract String getUsername(); + public abstract @Nullable String getUrl(); + + public abstract @Nullable String getUsername(); - @Nullable - public abstract String getPassword(); + public abstract @Nullable String getPassword(); - @Nullable - public abstract PrivateKey getPrivateKey(); + public abstract @Nullable PrivateKey getPrivateKey(); - @Nullable - public abstract String getOauthToken(); + public abstract @Nullable String getOauthToken(); - @Nullable - public abstract String getDatabase(); + public abstract @Nullable String getDatabase(); - @Nullable - public abstract String getWarehouse(); + public abstract @Nullable String getWarehouse(); - @Nullable - public abstract String getSchema(); + public abstract @Nullable String getSchema(); - @Nullable - public abstract String getServerName(); + public abstract @Nullable String getServerName(); - @Nullable - public abstract Integer getPortNumber(); + public abstract @Nullable Integer getPortNumber(); - @Nullable - public abstract String getRole(); + public abstract @Nullable String getRole(); - @Nullable - public abstract Integer getLoginTimeout(); + public abstract @Nullable Integer getLoginTimeout(); - @Nullable - public abstract Boolean getSsl(); + public abstract @Nullable Boolean getSsl(); - @Nullable - public abstract Boolean getValidate(); + public abstract @Nullable Boolean getValidate(); - @Nullable - public abstract DataSource getDataSource(); + public abstract @Nullable DataSource getDataSource(); abstract Builder builder(); diff --git a/sdks/python/apache_beam/io/external/snowflake.py b/sdks/python/apache_beam/io/external/snowflake.py index 16cdca8e7476..c0e681a70474 100644 --- a/sdks/python/apache_beam/io/external/snowflake.py +++ b/sdks/python/apache_beam/io/external/snowflake.py @@ -15,19 +15,6 @@ # limitations under the License. # -# pytype: skip-file - -from __future__ import absolute_import - -import typing - -from past.builtins import unicode - -import apache_beam as beam -from apache_beam.transforms.external import BeamJarExpansionService -from apache_beam.transforms.external import ExternalTransform -from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder - """Snowflake transforms tested against Flink portable runner. **Setup** Transforms provided in this module are cross-language transforms @@ -40,10 +27,9 @@ * Option 2: specify a custom expansion service See below for details regarding each of these options. *Option 1: Use the default expansion service* - This is the recommended and easiest setup option for using Python Kafka - transforms. This option is only available for Beam 2.22.0 and later. - This option requires following pre-requisites before running the Beam - pipeline. + This is the recommended and easiest setup option for using Python Snowflake + transforms.This option requires following pre-requisites + before running the Beam pipeline. * Install Java runtime in the computer from where the pipeline is constructed and make sure that 'java' command is available. In this option, Python SDK will either download (for released Beam version) or @@ -69,11 +55,25 @@ - https://beam.apache.org/documentation/runners/flink/ """ +# pytype: skip-file + +from __future__ import absolute_import + +import typing + +from past.builtins import unicode + +import apache_beam as beam +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import ExternalTransform +from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder + __all__ = ['ReadFromSnowflake'] def default_io_expansion_service(): - return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar') + return BeamJarExpansionService( + 'sdks:java:io:snowflake:expansion-service:shadowJar') ReadFromSnowflakeSchema = typing.NamedTuple( @@ -116,46 +116,48 @@ def __init__( query=None, expansion_service=None): """ - Initializes a read operation from Snowflake. - - Required parameters: - :param server_name: full Snowflake server name with the following format - account.region.gcp.snowflakecomputing.com. - :param schema: name of the Snowflake schema in the database to use. - :param database: name of the Snowflake database to use. - :param staging_bucket_name: name of the Google Cloud Storage bucket. - Bucket will be used as a temporary location for storing CSV files. - Those temporary directories will be named - `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` - and they will be removed automatically once Read operation finishes. - :param storage_integration_name: is the name of storage integration - object created according to Snowflake documentation. - :param csv_mapper: specifies a function which must translate - user-defined object to array of strings. - SnowflakeIO uses a COPY INTO statement to - move data from a Snowflake table to Google Cloud Storage as CSV files. - These files are then downloaded via FileIO and processed line by line. - Each line is split into an array of Strings using the OpenCSV - The csv_mapper function job is to give the user the possibility to - convert the array of Strings to a user-defined type, - ie. GenericRecord for Avro or Parquet files, or custom objects. + Initializes a read operation from Snowflake. + + Required parameters: + :param server_name: full Snowflake server name with the following format + account.region.gcp.snowflakecomputing.com. + :param schema: name of the Snowflake schema in the database to use. + :param database: name of the Snowflake database to use. + :param staging_bucket_name: name of the Google Cloud Storage bucket. + Bucket will be used as a temporary location for storing CSV files. + Those temporary directories will be named + `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` + and they will be removed automatically once Read operation finishes. + :param storage_integration_name: is the name of storage integration + object created according to Snowflake documentation. + :param csv_mapper: specifies a function which must translate + user-defined object to array of strings. + SnowflakeIO uses a COPY INTO statement to move data from + a Snowflake table to Google Cloud Storage as CSV files.These files + are then downloaded via FileIO and processed line by line. + Each line is split into an array of Strings using the OpenCSV + The csv_mapper function job is to give the user the possibility to + convert the array of Strings to a user-defined type, + ie. GenericRecord for Avro or Parquet files, or custom objects. Example: - ``` - def csv_mapper(strings_array): - return User(strings_array[0], int(strings_array[1]))) - ``` - :param table or query: specifies a Snowflake table name or custom SQL query - :param expansion_service: specifies URL of expansion service. - - Authentication parameters: - It's required to pass one of the following combinations of valid parameters: - :param username and password: specifies username and password - for username/password authentication method. - :param private_key_path and private_key_passphrase: - specifies a private key file and password - for key/ pair authentication method. - :param o_auth_token: specifies access token for OAuth authentication method. - """ + ``` + def csv_mapper(strings_array): + return User(strings_array[0], int(strings_array[1]))) + ``` + :param table or query: specifies a Snowflake table name or + custom SQL query + :param expansion_service: specifies URL of expansion service. + + Authentication parameters: + It's required to pass one of the following combinations of parameters: + :param username and password: specifies username and password + for username/password authentication method. + :param private_key_path and private_key_passphrase: + specifies a private key file and password + for key/ pair authentication method. + :param o_auth_token: specifies access token for + OAuth authentication method. + """ self.params = ReadFromSnowflakeSchema( server_name=server_name, diff --git a/settings.gradle b/settings.gradle index 02208578af26..638c216b6231 100644 --- a/settings.gradle +++ b/settings.gradle @@ -120,6 +120,7 @@ include ":sdks:java:io:rabbitmq" include ":sdks:java:io:redis" include ":sdks:java:io:solr" include ":sdks:java:io:snowflake" +include ":sdks:java:io:snowflake:expansion-service" include ":sdks:java:io:splunk" include ":sdks:java:io:thrift" include ":sdks:java:io:tika" From 951b54cbb2e22bd77aa78604dcbc78a758bfd5ca Mon Sep 17 00:00:00 2001 From: "pawel.urbanowicz" Date: Tue, 4 Aug 2020 14:05:03 +0200 Subject: [PATCH 6/6] fix: CI --- .../crosslanguage/SnowflakeReadRegistrar.java | 2 +- .../apache_beam/io/external/snowflake.py | 123 ++++++++++-------- 2 files changed, 71 insertions(+), 54 deletions(-) diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java index d8b51b048b12..1e7be0f23f7b 100644 --- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java +++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeReadRegistrar.java @@ -43,7 +43,7 @@ public final class SnowflakeReadRegistrar implements ExternalTransformRegistrar public static final String URN = "beam:external:java:snowflake:read:v1"; @Override - public Map> knownBuilders() { + public Map>> knownBuilders() { return ImmutableMap.of(URN, ReadBuilder.class); } diff --git a/sdks/python/apache_beam/io/external/snowflake.py b/sdks/python/apache_beam/io/external/snowflake.py index c0e681a70474..e7ffa6a67036 100644 --- a/sdks/python/apache_beam/io/external/snowflake.py +++ b/sdks/python/apache_beam/io/external/snowflake.py @@ -16,41 +16,58 @@ # """Snowflake transforms tested against Flink portable runner. + **Setup** + Transforms provided in this module are cross-language transforms implemented in the Beam Java SDK. During the pipeline construction, Python SDK will connect to a Java expansion service to expand these transforms. To facilitate this, a small amount of setup is needed before using these transforms in a Beam Python pipeline. + There are several ways to setup cross-language Snowflake transforms. + * Option 1: use the default expansion service * Option 2: specify a custom expansion service + See below for details regarding each of these options. + *Option 1: Use the default expansion service* + This is the recommended and easiest setup option for using Python Snowflake transforms.This option requires following pre-requisites before running the Beam pipeline. + * Install Java runtime in the computer from where the pipeline is constructed and make sure that 'java' command is available. + In this option, Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) a expansion service jar and use that to expand transforms. Currently Snowflake transforms use the 'beam-sdks-java-io-expansion-service' jar for this purpose. + *Option 2: specify a custom expansion service* + In this option, you startup your own expansion service and provide that as a parameter when using the transforms provided in this module. + This option requires following pre-requisites before running the Beam pipeline. + * Startup your own expansion service. * Update your pipeline to provide the expansion service address when initiating Snowflake transforms provided in this module. + Flink Users can use the built-in Expansion Service of the Flink Runner's Job Server. If you start Flink's Job Server, the expansion service will be started on port 8097. For a different address, please set the expansion_service parameter. + **More information** + For more information regarding cross-language transforms see: - https://beam.apache.org/roadmap/portability/ + For more information specific to Flink runner see: - https://beam.apache.org/documentation/runners/flink/ """ @@ -68,16 +85,8 @@ from apache_beam.transforms.external import ExternalTransform from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder -__all__ = ['ReadFromSnowflake'] - - -def default_io_expansion_service(): - return BeamJarExpansionService( - 'sdks:java:io:snowflake:expansion-service:shadowJar') - - ReadFromSnowflakeSchema = typing.NamedTuple( - 'WriteToSnowflakeSchema', + 'ReadFromSnowflakeSchema', [ ('server_name', unicode), ('schema', unicode), @@ -94,8 +103,15 @@ def default_io_expansion_service(): ]) +def default_io_expansion_service(): + return BeamJarExpansionService( + 'sdks:java:io:snowflake:expansion-service:shadowJar') + + class ReadFromSnowflake(beam.PTransform): - """An external PTransform which reads from Snowflake.""" + """ + An external PTransform which reads from Snowflake. + """ URN = 'beam:external:java:snowflake:read:v1' @@ -116,49 +132,50 @@ def __init__( query=None, expansion_service=None): """ - Initializes a read operation from Snowflake. - - Required parameters: - :param server_name: full Snowflake server name with the following format - account.region.gcp.snowflakecomputing.com. - :param schema: name of the Snowflake schema in the database to use. - :param database: name of the Snowflake database to use. - :param staging_bucket_name: name of the Google Cloud Storage bucket. - Bucket will be used as a temporary location for storing CSV files. - Those temporary directories will be named - `sf_copy_csv_DATE_TIME_RANDOMSUFFIX` - and they will be removed automatically once Read operation finishes. - :param storage_integration_name: is the name of storage integration - object created according to Snowflake documentation. - :param csv_mapper: specifies a function which must translate - user-defined object to array of strings. - SnowflakeIO uses a COPY INTO statement to move data from - a Snowflake table to Google Cloud Storage as CSV files.These files - are then downloaded via FileIO and processed line by line. - Each line is split into an array of Strings using the OpenCSV - The csv_mapper function job is to give the user the possibility to - convert the array of Strings to a user-defined type, - ie. GenericRecord for Avro or Parquet files, or custom objects. - Example: - ``` - def csv_mapper(strings_array): - return User(strings_array[0], int(strings_array[1]))) - ``` - :param table or query: specifies a Snowflake table name or - custom SQL query - :param expansion_service: specifies URL of expansion service. - - Authentication parameters: - It's required to pass one of the following combinations of parameters: - :param username and password: specifies username and password - for username/password authentication method. - :param private_key_path and private_key_passphrase: - specifies a private key file and password - for key/ pair authentication method. - :param o_auth_token: specifies access token for - OAuth authentication method. - """ - + Initializes a read operation from Snowflake. + + Required parameters: + + :param server_name: full Snowflake server name with the following format + account.region.gcp.snowflakecomputing.com. + :param schema: name of the Snowflake schema in the database to use. + :param database: name of the Snowflake database to use. + :param staging_bucket_name: name of the Google Cloud Storage bucket.:: + Bucket will be used as a temporary location for storing CSV files. + Those temporary directories will be named + 'sf_copy_csv_DATE_TIME_RANDOMSUFFIX' + and they will be removed automatically once Read operation finishes. + :param storage_integration_name: is the name of storage integration + object created according to Snowflake documentation. + :param csv_mapper: specifies a function which must translate + user-defined object to array of strings. + SnowflakeIO uses a COPY INTO statement to move data from + a Snowflake table to Google Cloud Storage as CSV files.These files + are then downloaded via FileIO and processed line by line. + Each line is split into an array of Strings using the OpenCSV + The csv_mapper function job is to give the user the possibility to + convert the array of Strings to a user-defined type, + ie. GenericRecord for Avro or Parquet files, or custom objects. + Example: + def csv_mapper(strings_array) + return User(strings_array[0], int(strings_array[1]))) + :param table: specifies a Snowflake table name. + :param query: specifies a Snowflake custom SQL query. + :param expansion_service: specifies URL of expansion service. + + Authentication parameters: + + :param username: specifies username for + username/password authentication method. + :param password: specifies password for + username/password authentication method. + :param private_key_path: specifies a private key file for + key/ pair authentication method. + :param private_key_passphrase: specifies password for + key/ pair authentication method. + :param o_auth_token: specifies access token for + OAuth authentication method. + """ self.params = ReadFromSnowflakeSchema( server_name=server_name, schema=schema,