diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidLocationException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidLocationException.java new file mode 100644 index 000000000000..4cfb5e9eb070 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidLocationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.io; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; + +/** Exception thrown when the configuration for a {@link SchemaIO} is invalid. */ +@Internal +@Experimental(Kind.SCHEMAS) +public class InvalidLocationException extends IllegalArgumentException { + public InvalidLocationException(String msg) { + super(msg); + } + + public InvalidLocationException(String msg, Throwable cause) { + super(msg, cause); + } + + public InvalidLocationException(Throwable cause) { + super(cause); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java index c5044a5ff45e..e8292fbe629b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java @@ -80,7 +80,7 @@ public BeamSqlTable buildBeamSqlTable(Table tableDefinition) { } } - private BeamTableStatistics getTableStatistics(PipelineOptions options) { + protected BeamTableStatistics getTableStatistics(PipelineOptions options, SchemaIO schemaIO) { if (isBounded().equals(PCollection.IsBounded.BOUNDED)) { return BeamTableStatistics.BOUNDED_UNKNOWN; } @@ -123,7 +123,7 @@ public POutput buildIOWriter(PCollection input) { @Override public BeamTableStatistics getTableStatistics(PipelineOptions options) { - return SchemaIOTableProviderWrapper.this.getTableStatistics(options); + return SchemaIOTableProviderWrapper.this.getTableStatistics(options, schemaIO); } } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java deleted file mode 100644 index 7efdb133d7de..000000000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java +++ /dev/null @@ -1,435 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.meta.provider.datastore; - -import static com.google.datastore.v1.client.DatastoreHelper.makeKey; -import static com.google.datastore.v1.client.DatastoreHelper.makeValue; - -import com.alibaba.fastjson.JSONObject; -import com.google.datastore.v1.Entity; -import com.google.datastore.v1.Query; -import com.google.datastore.v1.Value; -import com.google.datastore.v1.Value.ValueTypeCase; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import java.io.Serializable; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.function.Supplier; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; -import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable; -import org.apache.beam.sdk.extensions.sql.meta.Table; -import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException; -import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO; -import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.Schema.TypeName; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Internal -@Experimental -class DataStoreV1Table extends SchemaBaseBeamTable implements Serializable { - public static final String KEY_FIELD_PROPERTY = "keyField"; - @VisibleForTesting static final String DEFAULT_KEY_FIELD = "__key__"; - private static final Logger LOG = LoggerFactory.getLogger(DataStoreV1Table.class); - // Should match: `projectId/kind`. - private static final Pattern locationPattern = Pattern.compile("(?.+)/(?.+)"); - @VisibleForTesting final String keyField; - @VisibleForTesting final String projectId; - @VisibleForTesting final String kind; - - DataStoreV1Table(Table table) { - super(table.getSchema()); - - // TODO: allow users to specify a name of the field to store a key value via TableProperties. - JSONObject properties = table.getProperties(); - if (properties.containsKey(KEY_FIELD_PROPERTY)) { - String field = properties.getString(KEY_FIELD_PROPERTY); - if (!(field != null && !field.isEmpty())) { - throw new InvalidTableException( - String.format("'%s' property cannot be null.", KEY_FIELD_PROPERTY)); - } - keyField = field; - } else { - keyField = DEFAULT_KEY_FIELD; - } - // TODO: allow users to specify a namespace in a location string. - String location = table.getLocation(); - if (location == null) { - throw new InvalidTableException("DataStoreV1 location must be set: " + table); - } - Matcher matcher = locationPattern.matcher(location); - - if (!matcher.matches()) { - throw new InvalidTableException( - "DataStoreV1 location must be in the following format: 'projectId/kind'" - + " but was:" - + location); - } - - this.projectId = matcher.group("projectId"); - this.kind = matcher.group("kind"); - } - - @Override - public PCollection buildIOReader(PBegin begin) { - Query.Builder q = Query.newBuilder(); - q.addKindBuilder().setName(kind); - Query query = q.build(); - - DatastoreV1.Read readInstance = - DatastoreIO.v1().read().withProjectId(projectId).withQuery(query); - - return begin - .apply("Read Datastore Entities", readInstance) - .apply("Convert Datastore Entities to Rows", EntityToRow.create(getSchema(), keyField)); - } - - @Override - public POutput buildIOWriter(PCollection input) { - return input - .apply("Convert Rows to Datastore Entities", RowToEntity.create(keyField, kind)) - .apply("Write Datastore Entities", DatastoreIO.v1().write().withProjectId(projectId)); - } - - @Override - public IsBounded isBounded() { - return IsBounded.BOUNDED; - } - - @Override - public BeamTableStatistics getTableStatistics(PipelineOptions options) { - long count = - DatastoreIO.v1().read().withProjectId(projectId).getNumEntities(options, kind, null); - - if (count < 0) { - return BeamTableStatistics.BOUNDED_UNKNOWN; - } - - return BeamTableStatistics.createBoundedTableStatistics((double) count); - } - - /** - * A {@code PTransform} to perform a conversion of {@code PCollection} to {@code - * PCollection}. - */ - public static class EntityToRow extends PTransform, PCollection> { - private final Schema schema; - private final String keyField; - - private EntityToRow(Schema schema, String keyField) { - this.schema = schema; - this.keyField = keyField; - - if (schema.getFieldNames().contains(keyField)) { - if (!schema.getField(keyField).getType().getTypeName().equals(TypeName.BYTES)) { - throw new IllegalStateException( - "Field `" - + keyField - + "` should of type `VARBINARY`. Please change the type or specify a field to" - + " store the KEY value."); - } - LOG.info("Entity KEY will be stored under `" + keyField + "` field."); - } - } - - /** - * Create a PTransform instance. - * - * @param schema {@code Schema} of the target row. - * @param keyField A name of the row field to store the {@code Key} in. - * @return {@code PTransform} instance for Entity to Row conversion. - */ - public static EntityToRow create(Schema schema, String keyField) { - return new EntityToRow(schema, keyField); - } - - @Override - public PCollection expand(PCollection input) { - return input.apply(ParDo.of(new EntityToRowConverter())).setRowSchema(schema); - } - - @VisibleForTesting - class EntityToRowConverter extends DoFn { - - @DoFn.ProcessElement - public void processElement(ProcessContext context) { - Entity entity = context.element(); - ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); - mapBuilder.put(keyField, makeValue(entity.getKey()).build()); - mapBuilder.putAll(entity.getPropertiesMap()); - - context.output(extractRowFromProperties(schema, mapBuilder.build())); - } - - /** - * Convert DataStore {@code Value} to Beam type. - * - * @param currentFieldType Beam {@code Schema.FieldType} to convert to (used for {@code Row} - * and {@code Array}). - * @param val DataStore {@code Value}. - * @return resulting Beam type. - */ - private Object convertValueToObject(FieldType currentFieldType, Value val) { - ValueTypeCase typeCase = val.getValueTypeCase(); - - switch (typeCase) { - case NULL_VALUE: - case VALUETYPE_NOT_SET: - return null; - case BOOLEAN_VALUE: - return val.getBooleanValue(); - case INTEGER_VALUE: - return val.getIntegerValue(); - case DOUBLE_VALUE: - return val.getDoubleValue(); - case TIMESTAMP_VALUE: - com.google.protobuf.Timestamp time = val.getTimestampValue(); - long millis = time.getSeconds() * 1000 + time.getNanos() / 1000; - return Instant.ofEpochMilli(millis).toDateTime(); - case STRING_VALUE: - return val.getStringValue(); - case KEY_VALUE: - return val.getKeyValue().toByteArray(); - case BLOB_VALUE: - return val.getBlobValue().toByteArray(); - case ENTITY_VALUE: - // Recursive mapping for row type. - Schema rowSchema = currentFieldType.getRowSchema(); - assert rowSchema != null; - Entity entity = val.getEntityValue(); - return extractRowFromProperties(rowSchema, entity.getPropertiesMap()); - case ARRAY_VALUE: - // Recursive mapping for collection type. - FieldType elementType = currentFieldType.getCollectionElementType(); - List valueList = val.getArrayValue().getValuesList(); - return valueList.stream() - .map(v -> convertValueToObject(elementType, v)) - .collect(Collectors.toList()); - case GEO_POINT_VALUE: - default: - throw new IllegalStateException( - "No conversion exists from type: " - + val.getValueTypeCase().name() - + " to Beam type."); - } - } - - /** - * Converts all properties of an {@code Entity} to Beam {@code Row}. - * - * @param schema Target row {@code Schema}. - * @param values A map of property names and values. - * @return resulting Beam {@code Row}. - */ - private Row extractRowFromProperties(Schema schema, Map values) { - Row.Builder builder = Row.withSchema(schema); - // It is not a guarantee that the values will be in the same order as the schema. - // Maybe metadata: - // https://cloud.google.com/appengine/docs/standard/python/datastore/metadataqueries - // TODO: figure out in what order the elements are in (without relying on Beam schema). - for (Schema.Field field : schema.getFields()) { - Value val = values.get(field.getName()); - builder.addValue(convertValueToObject(field.getType(), val)); - } - return builder.build(); - } - } - } - - /** - * A {@code PTransform} to perform a conversion of {@code PCollection} to {@code - * PCollection}. - */ - public static class RowToEntity extends PTransform, PCollection> { - private final Supplier keySupplier; - private final String kind; - private final String keyField; - - private RowToEntity(Supplier keySupplier, String kind, String keyField) { - this.keySupplier = keySupplier; - this.kind = kind; - this.keyField = keyField; - } - - @Override - public PCollection expand(PCollection input) { - boolean isFieldPresent = input.getSchema().getFieldNames().contains(keyField); - if (isFieldPresent) { - if (!input.getSchema().getField(keyField).getType().getTypeName().equals(TypeName.BYTES)) { - throw new IllegalStateException( - "Field `" - + keyField - + "` should of type `VARBINARY`. Please change the type or specify a field to" - + " write the KEY value from via TableProperties."); - } - LOG.info("Field to use as Entity KEY is set to: `" + keyField + "`."); - } - return input.apply(ParDo.of(new RowToEntityConverter(isFieldPresent))); - } - - /** - * Create a PTransform instance. - * - * @param keyField Row field containing a serialized {@code Key}, must be set when using user - * specified keys. - * @param kind DataStore `Kind` data will be written to (required when generating random {@code - * Key}s). - * @return {@code PTransform} instance for Row to Entity conversion. - */ - public static RowToEntity create(String keyField, String kind) { - return new RowToEntity( - (Supplier & Serializable) () -> UUID.randomUUID().toString(), kind, keyField); - } - - @VisibleForTesting - static RowToEntity createTest(String keyString, String keyField, String kind) { - return new RowToEntity((Supplier & Serializable) () -> keyString, kind, keyField); - } - - @VisibleForTesting - class RowToEntityConverter extends DoFn { - private final boolean useNonRandomKey; - - RowToEntityConverter(boolean useNonRandomKey) { - super(); - this.useNonRandomKey = useNonRandomKey; - } - - @DoFn.ProcessElement - public void processElement(ProcessContext context) { - Row row = context.element(); - - Schema schemaWithoutKeyField = - Schema.builder() - .addFields( - row.getSchema().getFields().stream() - .filter(field -> !field.getName().equals(keyField)) - .collect(Collectors.toList())) - .build(); - Entity.Builder entityBuilder = constructEntityFromRow(schemaWithoutKeyField, row); - entityBuilder.setKey(constructKeyFromRow(row)); - - context.output(entityBuilder.build()); - } - - /** - * Converts an entire {@code Row} to an appropriate DataStore {@code Entity.Builder}. - * - * @param row {@code Row} to convert. - * @return resulting {@code Entity.Builder}. - */ - private Entity.Builder constructEntityFromRow(Schema schema, Row row) { - Entity.Builder entityBuilder = Entity.newBuilder(); - for (Schema.Field field : schema.getFields()) { - Value val = mapObjectToValue(row.getValue(field.getName())); - entityBuilder.putProperties(field.getName(), val); - } - return entityBuilder; - } - - /** - * Create a random key for a {@code Row} without a keyField or use a user-specified key by - * parsing it from byte array when keyField is set. - * - * @param row {@code Row} to construct a key for. - * @return resulting {@code Key}. - */ - private com.google.datastore.v1.Key constructKeyFromRow(Row row) { - if (!useNonRandomKey) { - // When key field is not present - use key supplier to generate a random one. - return makeKey(kind, keySupplier.get()).build(); - } - byte[] keyBytes = row.getBytes(keyField); - try { - return com.google.datastore.v1.Key.parseFrom(keyBytes); - } catch (InvalidProtocolBufferException e) { - throw new IllegalStateException("Failed to parse DataStore key from bytes."); - } - } - - /** - * Converts a {@code Row} value to an appropriate DataStore {@code Value} object. - * - * @param value {@code Row} value to convert. - * @throws IllegalStateException when no mapping function for object of given type exists. - * @return resulting {@code Value}. - */ - private Value mapObjectToValue(Object value) { - if (value == null) { - return Value.newBuilder().build(); - } - - if (Boolean.class.equals(value.getClass())) { - return makeValue((Boolean) value).build(); - } else if (Byte.class.equals(value.getClass())) { - return makeValue((Byte) value).build(); - } else if (Long.class.equals(value.getClass())) { - return makeValue((Long) value).build(); - } else if (Short.class.equals(value.getClass())) { - return makeValue((Short) value).build(); - } else if (Integer.class.equals(value.getClass())) { - return makeValue((Integer) value).build(); - } else if (Double.class.equals(value.getClass())) { - return makeValue((Double) value).build(); - } else if (Float.class.equals(value.getClass())) { - return makeValue((Float) value).build(); - } else if (String.class.equals(value.getClass())) { - return makeValue((String) value).build(); - } else if (Instant.class.equals(value.getClass())) { - return makeValue(((Instant) value).toDate()).build(); - } else if (byte[].class.equals(value.getClass())) { - return makeValue(ByteString.copyFrom((byte[]) value)).build(); - } else if (value instanceof Row) { - // Recursive conversion to handle nested rows. - Row row = (Row) value; - return makeValue(constructEntityFromRow(row.getSchema(), row)).build(); - } else if (value instanceof Collection) { - // Recursive to handle nested collections. - Collection collection = (Collection) value; - List arrayValues = - collection.stream().map(this::mapObjectToValue).collect(Collectors.toList()); - return makeValue(arrayValues).build(); - } - throw new IllegalStateException( - "No conversion exists from type: " + value.getClass() + " to DataStove Value."); - } - } - } -} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java index 9ecf6684d793..b579e06f3661 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java @@ -18,13 +18,21 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.datastore; import com.google.auto.service.AutoService; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.Table; -import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; +import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider; +import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.schemas.io.SchemaIOProvider; /** - * {@link TableProvider} for {@link DataStoreV1Table}. + * {@link TableProvider} for {@link DatastoreIO} for consumption by Beam SQL. + * + *

Passes the {@link DataStoreV1SchemaIOProvider} to the generalized table provider wrapper, + * {@link SchemaIOTableProviderWrapper}, for DataStoreV1 specific behavior. * *

A sample of DataStoreV1Table table is: * @@ -39,7 +47,11 @@ * } */ @AutoService(TableProvider.class) -public class DataStoreV1TableProvider extends InMemoryMetaTableProvider { +public class DataStoreV1TableProvider extends SchemaIOTableProviderWrapper { + @Override + public SchemaIOProvider getSchemaIOProvider() { + return new DataStoreV1SchemaIOProvider(); + } @Override public String getTableType() { @@ -47,7 +59,18 @@ public String getTableType() { } @Override - public BeamSqlTable buildBeamSqlTable(Table table) { - return new DataStoreV1Table(table); + public BeamTableStatistics getTableStatistics(PipelineOptions options, SchemaIO schemaIO) { + DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO; + long count = + DatastoreIO.v1() + .read() + .withProjectId(dataStoreV1SchemaIO.getProjectId()) + .getNumEntities(options, dataStoreV1SchemaIO.getKind(), null); + + if (count < 0) { + return BeamTableStatistics.BOUNDED_UNKNOWN; + } + + return BeamTableStatistics.createBoundedTableStatistics((double) count); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java index 32748468d7e5..c2c499a5cce6 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java @@ -38,11 +38,11 @@ import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; -import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow; -import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO; import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1; +import org.apache.beam.sdk.io.gcp.datastore.EntityToRow; +import org.apache.beam.sdk.io.gcp.datastore.RowToEntity; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.testing.PAssert; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java deleted file mode 100644 index 879add1f3e08..000000000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.meta.provider.datastore; - -import static org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.DEFAULT_KEY_FIELD; -import static org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.KEY_FIELD_PROPERTY; -import static org.apache.beam.sdk.schemas.Schema.toSchema; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; - -import com.alibaba.fastjson.JSON; -import java.util.stream.Stream; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.Table; -import org.apache.beam.sdk.schemas.Schema; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class DataStoreTableProviderTest { - private DataStoreV1TableProvider provider = new DataStoreV1TableProvider(); - - @Test - public void testGetTableType() { - assertEquals("datastoreV1", provider.getTableType()); - } - - @Test - public void testBuildBeamSqlTable() { - final String location = "projectId/batch_kind"; - Table table = fakeTable("TEST", location); - BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - - assertNotNull(sqlTable); - assertTrue(sqlTable instanceof DataStoreV1Table); - - DataStoreV1Table datastoreTable = (DataStoreV1Table) sqlTable; - assertEquals("projectId", datastoreTable.projectId); - assertEquals("batch_kind", datastoreTable.kind); - assertEquals(DEFAULT_KEY_FIELD, datastoreTable.keyField); - } - - @Test - public void testTableProperty() { - final String location = "projectId/batch_kind"; - Table table = - fakeTableWithProperties("TEST", location, "{ " + KEY_FIELD_PROPERTY + ": \"field_name\" }"); - BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - - assertNotNull(sqlTable); - assertTrue(sqlTable instanceof DataStoreV1Table); - - DataStoreV1Table datastoreTable = (DataStoreV1Table) sqlTable; - assertEquals("projectId", datastoreTable.projectId); - assertEquals("batch_kind", datastoreTable.kind); - assertEquals("field_name", datastoreTable.keyField); - } - - @Test - public void testTableProperty_nullValue_throwsException() { - final String location = "projectId/batch_kind"; - Table table = fakeTableWithProperties("TEST", location, "{ " + KEY_FIELD_PROPERTY + ": \"\" }"); - assertThrows(IllegalArgumentException.class, () -> provider.buildBeamSqlTable(table)); - } - - private static Table fakeTable(String name, String location) { - return Table.builder() - .name(name) - .comment(name + " table") - .location(location) - .schema( - Stream.of( - Schema.Field.nullable("id", Schema.FieldType.INT32), - Schema.Field.nullable("name", Schema.FieldType.STRING)) - .collect(toSchema())) - .type("datastoreV1") - .build(); - } - - private static Table fakeTableWithProperties(String name, String location, String properties) { - return Table.builder() - .name(name) - .comment(name + " table") - .location(location) - .schema( - Stream.of( - Schema.Field.nullable("id", Schema.FieldType.INT32), - Schema.Field.nullable("name", Schema.FieldType.STRING)) - .collect(toSchema())) - .type("datastoreV1") - .properties(JSON.parseObject(properties)) - .build(); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java new file mode 100644 index 000000000000..216ccf47fca1 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.datastore; + +import com.google.auto.service.AutoService; +import com.google.datastore.v1.Query; +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.schemas.io.InvalidLocationException; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.schemas.io.SchemaIOProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; + +/** + * An implementation of {@link SchemaIOProvider} for reading and writing payloads with {@link + * DatastoreIO}. + */ +@Internal +@AutoService(SchemaIOProvider.class) +public class DataStoreV1SchemaIOProvider implements SchemaIOProvider { + public static final String KEY_FIELD_PROPERTY = "keyField"; + static final String DEFAULT_KEY_FIELD = "__key__"; + private static final Pattern locationPattern = Pattern.compile("(?.+)/(?.+)"); + + /** Returns an id that uniquely represents this IO. */ + @Override + public String identifier() { + return "datastoreV1"; + } + + /** + * Returns the expected schema of the configuration object. Note this is distinct from the schema + * of the data source itself. + * + *

Configuration Parameters: + * + *

    + *
  • STRING keyField: The name of the Beam schema field to map the DataStore entity key. + * Defaults to {@code __key__} if not set or null. + *
+ */ + @Override + public Schema configurationSchema() { + // TODO: allow users to specify a name of the field to store a key value via TableProperties. + return Schema.builder().addNullableField(KEY_FIELD_PROPERTY, Schema.FieldType.STRING).build(); + } + + /** + * Produce a SchemaIO given a String representing the data's location, the schema of the data that + * resides there, and some IO-specific configuration object. + */ + @Override + public DataStoreV1SchemaIO from(String location, Row configuration, Schema dataSchema) { + return new DataStoreV1SchemaIO(location, configuration, dataSchema); + } + + @Override + public boolean requiresDataSchema() { + return true; + } + + @Override + public PCollection.IsBounded isBounded() { + return PCollection.IsBounded.BOUNDED; + } + + /** An abstraction to create schema aware IOs. */ + public static class DataStoreV1SchemaIO implements SchemaIO, Serializable { + protected final Schema dataSchema; + protected final String location; + protected final String kind; + protected final String projectId; + protected final String keyField; + + private DataStoreV1SchemaIO(String location, Row config, Schema dataSchema) { + this.location = location; + this.dataSchema = dataSchema; + this.keyField = determineKeyField(config.getString(KEY_FIELD_PROPERTY)); + + Matcher matcher = locationPattern.matcher(this.location); + validateLocation(location, matcher); + + this.kind = matcher.group("kind"); + this.projectId = matcher.group("projectId"); + } + + @Override + public Schema schema() { + return dataSchema; + } + + @Override + public PTransform> buildReader() { + return new PTransform>() { + @Override + public PCollection expand(PBegin begin) { + Query.Builder q = Query.newBuilder(); + q.addKindBuilder().setName(kind); + Query query = q.build(); + + DatastoreV1.Read readInstance = + DatastoreIO.v1().read().withProjectId(projectId).withQuery(query); + + return begin + .apply("Read Datastore Entities", readInstance) + .apply( + "Convert Datastore Entities to Rows", EntityToRow.create(dataSchema, keyField)); + } + }; + } + + @Override + public PTransform, POutput> buildWriter() { + return new PTransform, POutput>() { + @Override + public POutput expand(PCollection input) { + return input + .apply("Convert Rows to Datastore Entities", RowToEntity.create(keyField, kind)) + .apply("Write Datastore Entities", DatastoreIO.v1().write().withProjectId(projectId)); + } + }; + } + + public String getProjectId() { + return projectId; + } + + public String getKind() { + return kind; + } + + private String determineKeyField(String configKey) { + if (configKey != null && configKey.isEmpty()) { + throw new InvalidConfigurationException( + String.format("'%s' property cannot be null.", KEY_FIELD_PROPERTY)); + } else if (configKey != null) { + return configKey; + } else { + return DEFAULT_KEY_FIELD; + } + } + + private void validateLocation(String location, Matcher matcher) { + // TODO: allow users to specify a namespace in a location string. + if (location == null) { + throw new InvalidLocationException("DataStoreV1 location must be set. "); + } + if (!matcher.matches()) { + throw new InvalidLocationException( + "DataStoreV1 location must be in the following format: 'projectId/kind'" + + " but was:" + + location); + } + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java new file mode 100644 index 000000000000..b86323c1c568 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.datastore; + +import static com.google.datastore.v1.client.DatastoreHelper.makeValue; + +import com.google.datastore.v1.Entity; +import com.google.datastore.v1.Value; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A {@code PTransform} to perform a conversion of {@link Entity} to {@link Row}. */ +public class EntityToRow extends PTransform, PCollection> { + private final Schema schema; + private final String keyField; + private static final Logger LOG = LoggerFactory.getLogger(DataStoreV1SchemaIOProvider.class); + + private EntityToRow(Schema schema, String keyField) { + this.schema = schema; + this.keyField = keyField; + + if (schema.getFieldNames().contains(keyField)) { + if (!schema.getField(keyField).getType().getTypeName().equals(Schema.TypeName.BYTES)) { + throw new IllegalStateException( + "Field `" + + keyField + + "` should of type `BYTES`. Please change the type or specify a field to" + + " store the KEY value."); + } + LOG.info("Entity KEY will be stored under `" + keyField + "` field."); + } + } + + /** + * Create a PTransform instance. + * + * @param schema {@code Schema} of the target row. + * @param keyField A name of the row field to store the {@code Key} in. + * @return {@code PTransform} instance for Entity to Row conversion. + */ + public static EntityToRow create(Schema schema, String keyField) { + return new EntityToRow(schema, keyField); + } + + @Override + public PCollection expand(PCollection input) { + return input.apply(ParDo.of(new EntityToRow.EntityToRowConverter())).setRowSchema(schema); + } + + class EntityToRowConverter extends DoFn { + + @DoFn.ProcessElement + public void processElement(ProcessContext context) { + Entity entity = context.element(); + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + mapBuilder.put(keyField, makeValue(entity.getKey()).build()); + mapBuilder.putAll(entity.getPropertiesMap()); + + context.output(extractRowFromProperties(schema, mapBuilder.build())); + } + + /** + * Convert DataStore {@code Value} to Beam type. + * + * @param currentFieldType Beam {@code Schema.FieldType} to convert to (used for {@code Row} and + * {@code Array}). + * @param val DataStore {@code Value}. + * @return resulting Beam type. + */ + private Object convertValueToObject(Schema.FieldType currentFieldType, Value val) { + Value.ValueTypeCase typeCase = val.getValueTypeCase(); + + switch (typeCase) { + case NULL_VALUE: + case VALUETYPE_NOT_SET: + return null; + case BOOLEAN_VALUE: + return val.getBooleanValue(); + case INTEGER_VALUE: + return val.getIntegerValue(); + case DOUBLE_VALUE: + return val.getDoubleValue(); + case TIMESTAMP_VALUE: + com.google.protobuf.Timestamp time = val.getTimestampValue(); + long millis = time.getSeconds() * 1000 + time.getNanos() / 1000; + return Instant.ofEpochMilli(millis).toDateTime(); + case STRING_VALUE: + return val.getStringValue(); + case KEY_VALUE: + return val.getKeyValue().toByteArray(); + case BLOB_VALUE: + return val.getBlobValue().toByteArray(); + case ENTITY_VALUE: + // Recursive mapping for row type. + Schema rowSchema = currentFieldType.getRowSchema(); + assert rowSchema != null; + Entity entity = val.getEntityValue(); + return extractRowFromProperties(rowSchema, entity.getPropertiesMap()); + case ARRAY_VALUE: + // Recursive mapping for collection type. + Schema.FieldType elementType = currentFieldType.getCollectionElementType(); + List valueList = val.getArrayValue().getValuesList(); + return valueList.stream() + .map(v -> convertValueToObject(elementType, v)) + .collect(Collectors.toList()); + case GEO_POINT_VALUE: + default: + throw new IllegalStateException( + "No conversion exists from type: " + + val.getValueTypeCase().name() + + " to Beam type."); + } + } + + /** + * Converts all properties of an {@code Entity} to Beam {@code Row}. + * + * @param schema Target row {@code Schema}. + * @param values A map of property names and values. + * @return resulting Beam {@code Row}. + */ + private Row extractRowFromProperties(Schema schema, Map values) { + Row.Builder builder = Row.withSchema(schema); + // It is not a guarantee that the values will be in the same order as the schema. + // Maybe metadata: + // https://cloud.google.com/appengine/docs/standard/python/datastore/metadataqueries + // TODO: figure out in what order the elements are in (without relying on Beam schema). + for (Schema.Field field : schema.getFields()) { + Value val = values.get(field.getName()); + builder.addValue(convertValueToObject(field.getType(), val)); + } + return builder.build(); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java new file mode 100644 index 000000000000..02ca67ce43cd --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.datastore; + +import static com.google.datastore.v1.client.DatastoreHelper.makeKey; +import static com.google.datastore.v1.client.DatastoreHelper.makeValue; + +import com.google.datastore.v1.Entity; +import com.google.datastore.v1.Value; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A {@code PTransform} to perform a conversion of {@link Row} to {@link Entity}. */ +public class RowToEntity extends PTransform, PCollection> { + private final Supplier keySupplier; + private final String kind; + private final String keyField; + private static final Logger LOG = LoggerFactory.getLogger(DataStoreV1SchemaIOProvider.class); + + private RowToEntity(Supplier keySupplier, String kind, String keyField) { + this.keySupplier = keySupplier; + this.kind = kind; + this.keyField = keyField; + } + + @Override + public PCollection expand(PCollection input) { + boolean isFieldPresent = input.getSchema().getFieldNames().contains(keyField); + if (isFieldPresent) { + if (!input + .getSchema() + .getField(keyField) + .getType() + .getTypeName() + .equals(Schema.TypeName.BYTES)) { + throw new IllegalStateException( + "Field `" + + keyField + + "` should of type `VARBINARY`. Please change the type or specify a field to" + + " write the KEY value from via TableProperties."); + } + LOG.info("Field to use as Entity KEY is set to: `" + keyField + "`."); + } + return input.apply(ParDo.of(new RowToEntity.RowToEntityConverter(isFieldPresent))); + } + + /** + * Create a PTransform instance. + * + * @param keyField Row field containing a serialized {@code Key}, must be set when using user + * specified keys. + * @param kind DataStore `Kind` data will be written to (required when generating random {@code + * Key}s). + * @return {@code PTransform} instance for Row to Entity conversion. + */ + public static RowToEntity create(String keyField, String kind) { + return new RowToEntity( + (Supplier & Serializable) () -> UUID.randomUUID().toString(), kind, keyField); + } + + public static RowToEntity createTest(String keyString, String keyField, String kind) { + return new RowToEntity((Supplier & Serializable) () -> keyString, kind, keyField); + } + + class RowToEntityConverter extends DoFn { + private final boolean useNonRandomKey; + + RowToEntityConverter(boolean useNonRandomKey) { + super(); + this.useNonRandomKey = useNonRandomKey; + } + + @DoFn.ProcessElement + public void processElement(ProcessContext context) { + Row row = context.element(); + + Schema schemaWithoutKeyField = + Schema.builder() + .addFields( + row.getSchema().getFields().stream() + .filter(field -> !field.getName().equals(keyField)) + .collect(Collectors.toList())) + .build(); + Entity.Builder entityBuilder = constructEntityFromRow(schemaWithoutKeyField, row); + entityBuilder.setKey(constructKeyFromRow(row)); + + context.output(entityBuilder.build()); + } + + /** + * Converts an entire {@code Row} to an appropriate DataStore {@code Entity.Builder}. + * + * @param row {@code Row} to convert. + * @return resulting {@code Entity.Builder}. + */ + private Entity.Builder constructEntityFromRow(Schema schema, Row row) { + Entity.Builder entityBuilder = Entity.newBuilder(); + for (Schema.Field field : schema.getFields()) { + Value val = mapObjectToValue(row.getValue(field.getName())); + entityBuilder.putProperties(field.getName(), val); + } + return entityBuilder; + } + + /** + * Create a random key for a {@code Row} without a keyField or use a user-specified key by + * parsing it from byte array when keyField is set. + * + * @param row {@code Row} to construct a key for. + * @return resulting {@code Key}. + */ + private com.google.datastore.v1.Key constructKeyFromRow(Row row) { + if (!useNonRandomKey) { + // When key field is not present - use key supplier to generate a random one. + return makeKey(kind, keySupplier.get()).build(); + } + byte[] keyBytes = row.getBytes(keyField); + try { + return com.google.datastore.v1.Key.parseFrom(keyBytes); + } catch (InvalidProtocolBufferException e) { + throw new IllegalStateException("Failed to parse DataStore key from bytes."); + } + } + + /** + * Converts a {@code Row} value to an appropriate DataStore {@code Value} object. + * + * @param value {@code Row} value to convert. + * @return resulting {@code Value}. + * @throws IllegalStateException when no mapping function for object of given type exists. + */ + private Value mapObjectToValue(Object value) { + if (value == null) { + return Value.newBuilder().build(); + } + + if (Boolean.class.equals(value.getClass())) { + return makeValue((Boolean) value).build(); + } else if (Byte.class.equals(value.getClass())) { + return makeValue((Byte) value).build(); + } else if (Long.class.equals(value.getClass())) { + return makeValue((Long) value).build(); + } else if (Short.class.equals(value.getClass())) { + return makeValue((Short) value).build(); + } else if (Integer.class.equals(value.getClass())) { + return makeValue((Integer) value).build(); + } else if (Double.class.equals(value.getClass())) { + return makeValue((Double) value).build(); + } else if (Float.class.equals(value.getClass())) { + return makeValue((Float) value).build(); + } else if (String.class.equals(value.getClass())) { + return makeValue((String) value).build(); + } else if (Instant.class.equals(value.getClass())) { + return makeValue(((Instant) value).toDate()).build(); + } else if (byte[].class.equals(value.getClass())) { + return makeValue(ByteString.copyFrom((byte[]) value)).build(); + } else if (value instanceof Row) { + // Recursive conversion to handle nested rows. + Row row = (Row) value; + return makeValue(constructEntityFromRow(row.getSchema(), row)).build(); + } else if (value instanceof Collection) { + // Recursive to handle nested collections. + Collection collection = (Collection) value; + List arrayValues = + collection.stream().map(this::mapObjectToValue).collect(Collectors.toList()); + return makeValue(arrayValues).build(); + } + throw new IllegalStateException( + "No conversion exists from type: " + value.getClass() + " to DataStove Value."); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java new file mode 100644 index 000000000000..147474496e7b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.values.Row; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DataStoreV1SchemaIOProviderTest { + static final String DEFAULT_KEY_FIELD = "__key__"; + public static final String KEY_FIELD_PROPERTY = "keyField"; + private DataStoreV1SchemaIOProvider provider = new DataStoreV1SchemaIOProvider(); + + @Test + public void testGetTableType() { + assertEquals("datastoreV1", provider.identifier()); + } + + @Test + public void testBuildBeamSqlTable() { + final String location = "projectId/batch_kind"; + + Row configuration = + Row.withSchema(provider.configurationSchema()) + .withFieldValue(KEY_FIELD_PROPERTY, null) + .build(); + SchemaIO schemaIO = provider.from(location, configuration, generateDataSchema()); + + assertNotNull(schemaIO); + assertTrue(schemaIO instanceof DataStoreV1SchemaIO); + + DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO; + assertEquals("projectId", dataStoreV1SchemaIO.projectId); + assertEquals("batch_kind", dataStoreV1SchemaIO.kind); + assertEquals(DEFAULT_KEY_FIELD, dataStoreV1SchemaIO.keyField); + } + + @Test + public void testTableProperty() { + final String location = "projectId/batch_kind"; + + Row configuration = + Row.withSchema(provider.configurationSchema()) + .withFieldValue(KEY_FIELD_PROPERTY, "field_name") + .build(); + SchemaIO schemaIO = provider.from(location, configuration, generateDataSchema()); + + assertNotNull(schemaIO); + assertTrue(schemaIO instanceof DataStoreV1SchemaIO); + + DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO; + assertEquals("projectId", dataStoreV1SchemaIO.projectId); + assertEquals("batch_kind", dataStoreV1SchemaIO.kind); + assertEquals("field_name", dataStoreV1SchemaIO.keyField); + } + + @Test + public void testTableProperty_nullValue_throwsException() { + final String location = "projectId/batch_kind"; + + Row configuration = + Row.withSchema(provider.configurationSchema()) + .withFieldValue(KEY_FIELD_PROPERTY, "") + .build(); + + assertThrows( + IllegalArgumentException.class, + () -> + (new DataStoreV1SchemaIOProvider()) + .from(location, configuration, generateDataSchema())); + } + + private static Schema generateDataSchema() { + return Schema.builder() + .addNullableField("id", Schema.FieldType.INT32) + .addNullableField("name", Schema.FieldType.STRING) + .build(); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java similarity index 90% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java index a682b2a936ca..32090bc16560 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java @@ -15,13 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql.meta.provider.datastore; +package org.apache.beam.sdk.io.gcp.datastore; import static com.google.datastore.v1.client.DatastoreHelper.makeKey; import static com.google.datastore.v1.client.DatastoreHelper.makeValue; -import static org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARBINARY; -import static org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.DEFAULT_KEY_FIELD; -import static org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone; import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN; import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTES; import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME; @@ -39,9 +36,6 @@ import java.util.Collections; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow; -import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.testing.PAssert; @@ -50,17 +44,20 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class DataStoreTableTest { +public class EntityToRowRowToEntityTest { private static final String KIND = "kind"; private static final String UUID_VALUE = UUID.randomUUID().toString(); private static final Key.Builder KEY = makeKey(KIND, UUID_VALUE); private static final DateTime DATE_TIME = parseTimestampWithUTCTimeZone("2018-05-28 20:17:40"); + static final String DEFAULT_KEY_FIELD = "__key__"; + private static final FieldType VARBINARY = FieldType.BYTES; private static final Schema NESTED_ROW_SCHEMA = Schema.builder().addNullableField("nestedLong", INT64).build(); @@ -74,7 +71,7 @@ public class DataStoreTableTest { .addNullableField("rowArray", array(FieldType.row(NESTED_ROW_SCHEMA))) .addNullableField("double", DOUBLE) .addNullableField("bytes", BYTES) - .addNullableField("string", CalciteUtils.CHAR) + .addNullableField("string", STRING) .addNullableField("nullable", INT64) .build(); private static final Entity NESTED_ENTITY = @@ -187,4 +184,12 @@ public void testRowToEntityConverterWithoutKey() { private static Row row(Schema schema, Object... values) { return Row.withSchema(schema).addValues(values).build(); } + + public static DateTime parseTimestampWithUTCTimeZone(String str) { + if (str.indexOf('.') == -1) { + return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC().parseDateTime(str); + } else { + return DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS").withZoneUTC().parseDateTime(str); + } + } }