Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
reading data by exporting to JSON files. This has small differences in behavior for Time and Date-related fields. See
Pydoc for more information.
* Add dispositions for SnowflakeIO.write ([BEAM-10343](https://issues.apache.org/jira/browse/BEAM-10343))
* Add cross-language support to SnowflakeIO.Read([BEAM-9897](https://issues.apache.org/jira/browse/BEAM-9897)).

## New Features / Improvements

Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/snowflake/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.snowflake')
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: SDKs :: Java :: IO :: Snowflake"
ext.summary = "IO to read and write on Snowflake."
dependencies {
Expand Down
38 changes: 38 additions & 0 deletions sdks/java/io/snowflake/expansion-service/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"

applyJavaNature(enableChecker:false,
automaticModuleName: 'org.apache.beam.sdk.io.expansion.service',
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
)

description = "Apache Beam :: SDKs :: Java :: IO :: Snowflake ::Expansion Service"
ext.summary = "Expansion service serving Snowflake IO"

dependencies {
compile project(":sdks:java:expansion-service")
compile project(":sdks:java:io:snowflake")
runtime library.java.slf4j_jdk14
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,52 @@
package org.apache.beam.sdk.io.snowflake.credentials;

import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
import org.apache.beam.sdk.io.snowflake.crosslanguage.SnowflakeReadRegistrar;

/**
* Factory class for creating implementations of {@link SnowflakeCredentials} from {@link
* SnowflakePipelineOptions}.
*/
public class SnowflakeCredentialsFactory {
public static SnowflakeCredentials of(SnowflakePipelineOptions options) {
if (oauthOptionsAvailable(options)) {
return new OAuthTokenSnowflakeCredentials(options.getOauthToken());
} else if (usernamePasswordOptionsAvailable(options)) {
return new UsernamePasswordSnowflakeCredentials(options.getUsername(), options.getPassword());
} else if (keyPairOptionsAvailable(options)) {
public static SnowflakeCredentials of(SnowflakePipelineOptions o) {
if (oauthOptionsAvailable(o.getOauthToken())) {
return new OAuthTokenSnowflakeCredentials(o.getOauthToken());
} else if (usernamePasswordOptionsAvailable(o.getUsername(), o.getPassword())) {
return new UsernamePasswordSnowflakeCredentials(o.getUsername(), o.getPassword());
} else if (keyPairOptionsAvailable(
o.getUsername(), o.getPrivateKeyPath(), o.getPrivateKeyPassphrase())) {
return new KeyPairSnowflakeCredentials(
options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase());
o.getUsername(), o.getPrivateKeyPath(), o.getPrivateKeyPassphrase());
}
throw new RuntimeException("Can't get credentials from Options");
}

private static boolean oauthOptionsAvailable(SnowflakePipelineOptions options) {
return options.getOauthToken() != null && !options.getOauthToken().isEmpty();
public static SnowflakeCredentials of(SnowflakeReadRegistrar.ReadConfiguration c) {
if (oauthOptionsAvailable(c.getOAuthToken())) {
return new OAuthTokenSnowflakeCredentials(c.getOAuthToken());
} else if (usernamePasswordOptionsAvailable(c.getUsername(), c.getPassword())) {
return new UsernamePasswordSnowflakeCredentials(c.getUsername(), c.getPassword());
} else if (keyPairOptionsAvailable(
c.getUsername(), c.getPrivateKeyPath(), c.getPrivateKeyPassphrase())) {
return new KeyPairSnowflakeCredentials(
c.getUsername(), c.getPrivateKeyPath(), c.getPrivateKeyPassphrase());
}
throw new RuntimeException("Can't get credentials from Options");
}

private static boolean oauthOptionsAvailable(String token) {
return token != null && !token.isEmpty();
}

private static boolean usernamePasswordOptionsAvailable(SnowflakePipelineOptions options) {
return options.getUsername() != null
&& !options.getUsername().isEmpty()
&& !options.getPassword().isEmpty();
private static boolean usernamePasswordOptionsAvailable(String username, String password) {
return username != null && !username.isEmpty() && !password.isEmpty();
}

private static boolean keyPairOptionsAvailable(SnowflakePipelineOptions options) {
return options.getUsername() != null
&& !options.getUsername().isEmpty()
&& !options.getPrivateKeyPath().isEmpty()
&& !options.getPrivateKeyPassphrase().isEmpty();
private static boolean keyPairOptionsAvailable(
String username, String privateKeyPath, String privateKeyPassphrase) {
return username != null
&& !username.isEmpty()
&& !privateKeyPath.isEmpty()
&& !privateKeyPassphrase.isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.snowflake.crosslanguage;

/** Parameters abstract class to expose the transforms to an external SDK. */
public abstract class Configuration {
private String serverName;
private String username;
private String password;
private String privateKeyPath;
private String privateKeyPassphrase;
private String oAuthToken;
private String database;
private String schema;
private String table;
private String query;
private String stagingBucketName;
private String storageIntegrationName;

public String getServerName() {
return serverName;
}

public void setServerName(String serverName) {
this.serverName = serverName;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getPrivateKeyPath() {
return privateKeyPath;
}

public void setPrivateKeyPath(String privateKeyPath) {
this.privateKeyPath = privateKeyPath;
}

public String getPrivateKeyPassphrase() {
return privateKeyPassphrase;
}

public void setPrivateKeyPassphrase(String privateKeyPassphrase) {
this.privateKeyPassphrase = privateKeyPassphrase;
}

public String getOAuthToken() {
return oAuthToken;
}

public void setOAuthToken(String oAuthToken) {
this.oAuthToken = oAuthToken;
}

public String getDatabase() {
return database;
}

public void setDatabase(String database) {
this.database = database;
}

public String getSchema() {
return schema;
}

public void setSchema(String schema) {
this.schema = schema;
}

public String getTable() {
return table;
}

public void setTable(String table) {
this.table = table;
}

public String getQuery() {
return query;
}

public void setQuery(String query) {
this.query = query;
}

public String getStagingBucketName() {
return stagingBucketName;
}

public void setStagingBucketName(String stagingBucketName) {
this.stagingBucketName = stagingBucketName;
}

public String getStorageIntegrationName() {
return storageIntegrationName;
}

public void setStorageIntegrationName(String storageIntegrationName) {
this.storageIntegrationName = storageIntegrationName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.snowflake.crosslanguage;

import com.google.auto.service.AutoService;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Map;
import javax.sql.DataSource;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials;
import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/** Exposes {@link SnowflakeIO.Read} as an external transform for cross-language usage. */
@Experimental
@AutoService(ExternalTransformRegistrar.class)
public final class SnowflakeReadRegistrar implements ExternalTransformRegistrar {

public static final String URN = "beam:external:java:snowflake:read:v1";

@Override
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
return ImmutableMap.of(URN, ReadBuilder.class);
}

/** Parameters class to expose the transform to an external SDK. */
public static class ReadConfiguration extends Configuration {}

public static class ReadBuilder
implements ExternalTransformBuilder<ReadConfiguration, PBegin, PCollection<byte[]>> {
public ReadBuilder() {}

@Override
public PTransform<PBegin, PCollection<byte[]>> buildExternal(ReadConfiguration c) {
SnowflakeCredentials credentials = SnowflakeCredentialsFactory.of(c);

SerializableFunction<Void, DataSource> dataSourceSerializableFunction =
SnowflakeIO.DataSourceProviderFromDataSourceConfiguration.of(
SnowflakeIO.DataSourceConfiguration.create(credentials)
.withServerName(c.getServerName())
.withDatabase(c.getDatabase())
.withSchema(c.getSchema()));

return SnowflakeIO.<byte[]>read()
.withStorageIntegrationName(c.getStorageIntegrationName())
.withStagingBucketName(c.getStagingBucketName())
.withDataSourceProviderFn(dataSourceSerializableFunction)
.withCsvMapper(CsvMapper.getCsvMapper())
.withCoder(ByteArrayCoder.of())
.fromTable(c.getTable())
.fromQuery(c.getQuery());
}
}

private static class CsvMapper implements Serializable {

public static SnowflakeIO.CsvMapper getCsvMapper() {
return (SnowflakeIO.CsvMapper<byte[]>)
parts -> {
String partsCSV = String.join(",", parts);

return partsCSV.getBytes(Charset.defaultCharset());
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Cross-language for SnowflakeIO. */
@Experimental(Kind.PORTABILITY)
@DefaultAnnotation(NonNull.class)
package org.apache.beam.sdk.io.snowflake.crosslanguage;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
Loading