diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java new file mode 100644 index 000000000000..148c7260d0bf --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/DateTime.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; + +/** + * A datetime without a time-zone. + * + *

It cannot represent an instant on the time-line without additional information such as an + * offset or time-zone. + * + *

Its input type is a {@link LocalDateTime}, and base type is a {@link Row} containing Date + * field and Time field. Date field is the same as the base type of {@link Date}, which is a Long + * that represents incrementing count of days where day 0 is 1970-01-01 (ISO). Time field is the + * same as the base type of {@link Time}, which is a Long that represents a count of time in + * nanoseconds. + */ +public class DateTime implements Schema.LogicalType { + public static final String DATE_FIELD_NAME = "Date"; + public static final String TIME_FIELD_NAME = "Time"; + public static final Schema DATETIME_SCHEMA = + Schema.builder().addInt64Field(DATE_FIELD_NAME).addInt64Field(TIME_FIELD_NAME).build(); + + @Override + public String getIdentifier() { + return "beam:logical_type:datetime:v1"; + } + + // unused + @Override + public Schema.FieldType getArgumentType() { + return Schema.FieldType.STRING; + } + + // unused + @Override + public String getArgument() { + return ""; + } + + @Override + public Schema.FieldType getBaseType() { + return Schema.FieldType.row(DATETIME_SCHEMA); + } + + @Override + public Row toBaseType(LocalDateTime input) { + return input == null + ? null + : Row.withSchema(DATETIME_SCHEMA) + .addValues(input.toLocalDate().toEpochDay(), input.toLocalTime().toNanoOfDay()) + .build(); + } + + @Override + public LocalDateTime toInputType(Row base) { + return base == null + ? null + : LocalDateTime.of( + LocalDate.ofEpochDay(base.getInt64(DATE_FIELD_NAME)), + LocalTime.ofNanoOfDay(base.getInt64(TIME_FIELD_NAME))); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java index d22b77a15bac..ef6a68a8fb3a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java @@ -18,8 +18,10 @@ package org.apache.beam.sdk.schemas.logicaltypes; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import org.apache.beam.sdk.schemas.Schema.LogicalType; +import org.apache.beam.sdk.values.Row; /** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType}s corresponding to SQL data types. */ public class SqlTypes { @@ -31,4 +33,7 @@ private SqlTypes() {} /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL TIME type. */ public static final LogicalType TIME = new Time(); + + /** Beam LogicalType corresponding to ZetaSQL DATETIME type. */ + public static final LogicalType DATETIME = new DateTime(); } diff --git a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java index 4604a00520ec..42efbbda9ce1 100644 --- a/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java +++ b/sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datacatalog/SchemaUtils.java @@ -39,7 +39,7 @@ class SchemaUtils { .put("BOOL", FieldType.BOOLEAN) .put("BYTES", FieldType.BYTES) .put("DATE", FieldType.logicalType(SqlTypes.DATE)) - .put("DATETIME", FieldType.DATETIME) + .put("DATETIME", FieldType.logicalType(SqlTypes.DATETIME)) .put("DOUBLE", FieldType.DOUBLE) .put("FLOAT", FieldType.DOUBLE) .put("FLOAT64", FieldType.DOUBLE) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java index 1d234b0950b7..445242200af9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRelDataTypeSystem.java @@ -48,6 +48,8 @@ public int getMaxPrecision(SqlTypeName typeName) { switch (typeName) { case TIME: return 6; // support microsecond time precision + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return 6; // support microsecond datetime precision default: return super.getMaxPrecision(typeName); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index b6d606b0983e..d9ad401a81d3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -27,6 +27,7 @@ import java.lang.reflect.Type; import java.math.BigDecimal; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.util.AbstractList; import java.util.AbstractMap; @@ -35,13 +36,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TimeZone; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimestampWithLocalTzType; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.DateTime; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -92,6 +94,7 @@ public class BeamCalcRel extends AbstractBeamCalcRel { private static final long NANOS_PER_MILLISECOND = 1000000L; + private static final long MILLIS_PER_DAY = 86400000L; private static final ParameterExpression outputSchemaParam = Expressions.parameter(Schema.class, "outputSchema"); @@ -344,6 +347,18 @@ private static Expression castOutputTime(Expression value, FieldType toType) { valueDateTime = Expressions.unbox(valueDateTime); } valueDateTime = Expressions.call(LocalDate.class, "ofEpochDay", valueDateTime); + } else if (CalciteUtils.TIMESTAMP_WITH_LOCAL_TZ.typesEqual(toType) + || CalciteUtils.NULLABLE_TIMESTAMP_WITH_LOCAL_TZ.typesEqual(toType)) { + // Convert TimeStamp_With_Local_TimeZone to LocalDateTime + Expression dateValue = + Expressions.divide(valueDateTime, Expressions.constant(MILLIS_PER_DAY)); + Expression date = Expressions.call(LocalDate.class, "ofEpochDay", dateValue); + Expression timeValue = + Expressions.multiply( + Expressions.modulo(valueDateTime, Expressions.constant(MILLIS_PER_DAY)), + Expressions.constant(NANOS_PER_MILLISECOND)); + Expression time = Expressions.call(LocalTime.class, "ofNanoOfDay", timeValue); + valueDateTime = Expressions.call(LocalDateTime.class, "of", date, time); } else { throw new UnsupportedOperationException("Unknown DateTime type " + toType); } @@ -385,7 +400,7 @@ private static class InputGetterImpl implements RexToLixTranslator.InputGetter { .put(SqlTypes.DATE.getIdentifier(), Long.class) .put(SqlTypes.TIME.getIdentifier(), Long.class) .put(TimeWithLocalTzType.IDENTIFIER, ReadableInstant.class) - .put(TimestampWithLocalTzType.IDENTIFIER, ReadableInstant.class) + .put(SqlTypes.DATETIME.getIdentifier(), Row.class) .put(CharType.IDENTIFIER, String.class) .build(); @@ -442,6 +457,16 @@ private static Expression value(Expression value, Schema.FieldType type) { value, Expressions.divide(value, Expressions.constant(NANOS_PER_MILLISECOND))); } else if (SqlTypes.DATE.getIdentifier().equals(logicalId)) { return value; + } else if (SqlTypes.DATETIME.getIdentifier().equals(logicalId)) { + Expression dateValue = + Expressions.call(value, "getInt64", Expressions.constant(DateTime.DATE_FIELD_NAME)); + Expression timeValue = + Expressions.call(value, "getInt64", Expressions.constant(DateTime.TIME_FIELD_NAME)); + Expression returnValue = + Expressions.add( + Expressions.multiply(dateValue, Expressions.constant(MILLIS_PER_DAY)), + Expressions.divide(timeValue, Expressions.constant(NANOS_PER_MILLISECOND))); + return nullOr(value, returnValue); } else if (!CharType.IDENTIFIER.equals(logicalId)) { throw new UnsupportedOperationException( "Unknown LogicalType " + type.getLogicalType().getIdentifier()); @@ -563,6 +588,8 @@ public Object get(String name) { || name.equals(DataContext.Variable.CURRENT_TIMESTAMP.camelName) || name.equals(DataContext.Variable.LOCAL_TIMESTAMP.camelName)) { return System.currentTimeMillis(); + } else if (name.equals(Variable.TIME_ZONE.camelName)) { + return TimeZone.getDefault(); } return null; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index acb4ee170dac..2044ed490ec3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -53,15 +53,6 @@ public TimeWithLocalTzType() { } } - /** A LogicalType corresponding to TIMESTAMP_WITH_LOCAL_TIME_ZONE. */ - public static class TimestampWithLocalTzType extends PassThroughLogicalType { - public static final String IDENTIFIER = "SqlTimestampWithLocalTzType"; - - public TimestampWithLocalTzType() { - super(IDENTIFIER, FieldType.STRING, "", FieldType.DATETIME); - } - } - /** A LogicalType corresponding to CHAR. */ public static class CharType extends PassThroughLogicalType { public static final String IDENTIFIER = "SqlCharType"; @@ -82,7 +73,7 @@ public static boolean isDateTimeType(FieldType fieldType) { return logicalId.equals(SqlTypes.DATE.getIdentifier()) || logicalId.equals(SqlTypes.TIME.getIdentifier()) || logicalId.equals(TimeWithLocalTzType.IDENTIFIER) - || logicalId.equals(TimestampWithLocalTzType.IDENTIFIER); + || logicalId.equals(SqlTypes.DATETIME.getIdentifier()); } return false; } @@ -121,8 +112,9 @@ public static boolean isStringType(FieldType fieldType) { FieldType.logicalType(new TimeWithLocalTzType()); public static final FieldType TIMESTAMP = FieldType.DATETIME; public static final FieldType NULLABLE_TIMESTAMP = FieldType.DATETIME.withNullable(true); - public static final FieldType TIMESTAMP_WITH_LOCAL_TZ = - FieldType.logicalType(new TimestampWithLocalTzType()); + public static final FieldType TIMESTAMP_WITH_LOCAL_TZ = FieldType.logicalType(SqlTypes.DATETIME); + public static final FieldType NULLABLE_TIMESTAMP_WITH_LOCAL_TZ = + FieldType.logicalType(SqlTypes.DATETIME).withNullable(true); private static final BiMap BEAM_TO_CALCITE_TYPE_MAPPING = ImmutableBiMap.builder() diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java index 7aa5032c146b..f107dc31d3aa 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java @@ -38,6 +38,7 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.BitString; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString; import org.checkerframework.checker.nullness.qual.Nullable; public class BeamSqlUnparseContext extends SqlImplementor.SimpleContext { @@ -69,8 +70,12 @@ public BeamSqlUnparseContext(IntFunction field) { public SqlNode toSql(RexProgram program, RexNode rex) { if (rex.getKind().equals(SqlKind.LITERAL)) { final RexLiteral literal = (RexLiteral) rex; - SqlTypeFamily family = literal.getTypeName().getFamily(); - if (SqlTypeFamily.BINARY.equals(family)) { + SqlTypeName name = literal.getTypeName(); + SqlTypeFamily family = name.getFamily(); + if (SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.equals(name)) { + TimestampString timestampString = literal.getValueAs(TimestampString.class); + return new SqlDateTimeLiteral(timestampString, POS); + } else if (SqlTypeFamily.BINARY.equals(family)) { ByteString byteString = literal.getValueAs(ByteString.class); BitString bitString = BitString.createFromHexString(byteString.toString(16)); return new SqlByteStringLiteral(bitString, POS); @@ -92,6 +97,21 @@ public SqlNode toSql(RexProgram program, RexNode rex) { return super.toSql(program, rex); } + private static class SqlDateTimeLiteral extends SqlLiteral { + + private final TimestampString timestampString; + + SqlDateTimeLiteral(TimestampString timestampString, SqlParserPos pos) { + super(timestampString, SqlTypeName.TIMESTAMP, pos); + this.timestampString = timestampString; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.literal("DATETIME '" + timestampString.toString() + "'"); + } + } + private static class SqlByteStringLiteral extends SqlLiteral { SqlByteStringLiteral(BitString bytes, SqlParserPos pos) { diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java index 1b1641facc9c..0519798d65fd 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Arrays; import java.util.HashMap; @@ -368,22 +369,135 @@ public void testNullInnerRow() { } @Test - public void testNullDatetimeFields() { + public void testDatetimeFields() { Instant current = new Instant(1561671380000L); // Long value corresponds to 27/06/2019 Schema dateTimeFieldSchema = Schema.builder() .addField("dateTimeField", FieldType.DATETIME) .addNullableField("nullableDateTimeField", FieldType.DATETIME) - .addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME)) - .addNullableField("nullableTimeTypeField", FieldType.logicalType(SqlTypes.TIME)) + .build(); + + Row dateTimeRow = Row.withSchema(dateTimeFieldSchema).addValues(current, null).build(); + + PCollection outputRow = + pipeline + .apply(Create.of(dateTimeRow)) + .setRowSchema(dateTimeFieldSchema) + .apply( + SqlTransform.query( + "select EXTRACT(YEAR from dateTimeField) as yyyy, " + + " EXTRACT(YEAR from nullableDateTimeField) as year_with_null, " + + " EXTRACT(MONTH from dateTimeField) as mm, " + + " EXTRACT(MONTH from nullableDateTimeField) as month_with_null " + + " from PCOLLECTION")); + + Schema outputRowSchema = + Schema.builder() + .addField("yyyy", FieldType.INT64) + .addNullableField("year_with_null", FieldType.INT64) + .addField("mm", FieldType.INT64) + .addNullableField("month_with_null", FieldType.INT64) + .build(); + + PAssert.that(outputRow) + .containsInAnyOrder( + Row.withSchema(outputRowSchema).addValues(2019L, null, 06L, null).build()); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testSqlLogicalTypeDateFields() { + Schema dateTimeFieldSchema = + Schema.builder() .addField("dateTypeField", FieldType.logicalType(SqlTypes.DATE)) .addNullableField("nullableDateTypeField", FieldType.logicalType(SqlTypes.DATE)) .build(); + Row dateRow = + Row.withSchema(dateTimeFieldSchema).addValues(LocalDate.of(2019, 6, 27), null).build(); + + PCollection outputRow = + pipeline + .apply(Create.of(dateRow)) + .setRowSchema(dateTimeFieldSchema) + .apply( + SqlTransform.query( + "select EXTRACT(DAY from dateTypeField) as dd, " + + " EXTRACT(DAY from nullableDateTypeField) as day_with_null, " + + " dateTypeField + interval '1' day as date_with_day_added, " + + " nullableDateTypeField + interval '1' day as day_added_with_null " + + " from PCOLLECTION")); + + Schema outputRowSchema = + Schema.builder() + .addField("dd", FieldType.INT64) + .addNullableField("day_with_null", FieldType.INT64) + .addField("date_with_day_added", FieldType.logicalType(SqlTypes.DATE)) + .addNullableField("day_added_with_null", FieldType.logicalType(SqlTypes.DATE)) + .build(); + + PAssert.that(outputRow) + .containsInAnyOrder( + Row.withSchema(outputRowSchema) + .addValues(27L, null, LocalDate.of(2019, 6, 28), null) + .build()); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testSqlLogicalTypeTimeFields() { + Schema dateTimeFieldSchema = + Schema.builder() + .addField("timeTypeField", FieldType.logicalType(SqlTypes.TIME)) + .addNullableField("nullableTimeTypeField", FieldType.logicalType(SqlTypes.TIME)) + .build(); + + Row timeRow = + Row.withSchema(dateTimeFieldSchema).addValues(LocalTime.of(1, 0, 0), null).build(); + + PCollection outputRow = + pipeline + .apply(Create.of(timeRow)) + .setRowSchema(dateTimeFieldSchema) + .apply( + SqlTransform.query( + "select timeTypeField + interval '1' hour as time_with_hour_added, " + + " nullableTimeTypeField + interval '1' hour as hour_added_with_null, " + + " timeTypeField - INTERVAL '60' SECOND as time_with_seconds_added, " + + " nullableTimeTypeField - INTERVAL '60' SECOND as seconds_added_with_null " + + " from PCOLLECTION")); + + Schema outputRowSchema = + Schema.builder() + .addField("time_with_hour_added", FieldType.logicalType(SqlTypes.TIME)) + .addNullableField("hour_added_with_null", FieldType.logicalType(SqlTypes.TIME)) + .addField("time_with_seconds_added", FieldType.logicalType(SqlTypes.TIME)) + .addNullableField("seconds_added_with_null", FieldType.logicalType(SqlTypes.TIME)) + .build(); + + PAssert.that(outputRow) + .containsInAnyOrder( + Row.withSchema(outputRowSchema) + .addValues(LocalTime.of(2, 0, 0), null, LocalTime.of(0, 59, 0), null) + .build()); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(2)); + } + + @Test + public void testSqlLogicalTypeDatetimeFields() { + Schema dateTimeFieldSchema = + Schema.builder() + .addField("dateTimeField", FieldType.logicalType(SqlTypes.DATETIME)) + .addNullableField("nullableDateTimeField", FieldType.logicalType(SqlTypes.DATETIME)) + .build(); + Row dateTimeRow = Row.withSchema(dateTimeFieldSchema) - .addValues(current, null, LocalTime.of(1, 0, 0), null, LocalDate.of(2019, 6, 27), null) + .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0), null) .build(); PCollection outputRow = @@ -396,14 +510,14 @@ public void testNullDatetimeFields() { + " EXTRACT(YEAR from nullableDateTimeField) as year_with_null, " + " EXTRACT(MONTH from dateTimeField) as mm, " + " EXTRACT(MONTH from nullableDateTimeField) as month_with_null, " - + " timeTypeField + interval '1' hour as time_with_hour_added, " - + " nullableTimeTypeField + interval '1' hour as hour_added_with_null, " - + " timeTypeField - INTERVAL '60' SECOND as time_with_seconds_added, " - + " nullableTimeTypeField - INTERVAL '60' SECOND as seconds_added_with_null, " - + " EXTRACT(DAY from dateTypeField) as dd, " - + " EXTRACT(DAY from nullableDateTypeField) as day_with_null, " - + " dateTypeField + interval '1' day as date_with_day_added, " - + " nullableDateTypeField + interval '1' day as day_added_with_null " + + " dateTimeField + interval '1' hour as time_with_hour_added, " + + " nullableDateTimeField + interval '1' hour as hour_added_with_null, " + + " dateTimeField - INTERVAL '60' SECOND as time_with_seconds_added, " + + " nullableDateTimeField - INTERVAL '60' SECOND as seconds_added_with_null, " + + " EXTRACT(DAY from dateTimeField) as dd, " + + " EXTRACT(DAY from nullableDateTimeField) as day_with_null, " + + " dateTimeField + interval '1' day as date_with_day_added, " + + " nullableDateTimeField + interval '1' day as day_added_with_null " + " from PCOLLECTION")); Schema outputRowSchema = @@ -412,31 +526,31 @@ public void testNullDatetimeFields() { .addNullableField("year_with_null", FieldType.INT64) .addField("mm", FieldType.INT64) .addNullableField("month_with_null", FieldType.INT64) - .addField("time_with_hour_added", FieldType.logicalType(SqlTypes.TIME)) - .addNullableField("hour_added_with_null", FieldType.logicalType(SqlTypes.TIME)) - .addField("time_with_seconds_added", FieldType.logicalType(SqlTypes.TIME)) - .addNullableField("seconds_added_with_null", FieldType.logicalType(SqlTypes.TIME)) + .addField("time_with_hour_added", FieldType.logicalType(SqlTypes.DATETIME)) + .addNullableField("hour_added_with_null", FieldType.logicalType(SqlTypes.DATETIME)) + .addField("time_with_seconds_added", FieldType.logicalType(SqlTypes.DATETIME)) + .addNullableField("seconds_added_with_null", FieldType.logicalType(SqlTypes.DATETIME)) .addField("dd", FieldType.INT64) .addNullableField("day_with_null", FieldType.INT64) - .addField("date_with_day_added", FieldType.logicalType(SqlTypes.DATE)) - .addNullableField("day_added_with_null", FieldType.logicalType(SqlTypes.DATE)) + .addField("date_with_day_added", FieldType.logicalType(SqlTypes.DATETIME)) + .addNullableField("day_added_with_null", FieldType.logicalType(SqlTypes.DATETIME)) .build(); PAssert.that(outputRow) .containsInAnyOrder( Row.withSchema(outputRowSchema) .addValues( - 2019L, + 2008L, null, - 06L, + 12L, null, - LocalTime.of(2, 0, 0), + LocalDateTime.of(2008, 12, 25, 16, 30, 0), null, - LocalTime.of(0, 59, 0), + LocalDateTime.of(2008, 12, 25, 15, 29, 0), null, - 27L, + 25L, null, - LocalDate.of(2019, 6, 28), + LocalDateTime.of(2008, 12, 26, 15, 30, 0), null) .build()); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java index f0854bcfff8f..d4819fc966a6 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/BeamSqlRowCoderTest.java @@ -19,6 +19,7 @@ import java.math.BigDecimal; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; @@ -51,6 +52,7 @@ public void encodeAndDecode() throws Exception { .add("col_string_varchar", SqlTypeName.VARCHAR) .add("col_time", SqlTypeName.TIME) .add("col_date", SqlTypeName.DATE) + .add("col_timestamp_with_local_time_zone", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) .add("col_timestamp", SqlTypeName.TIMESTAMP) .add("col_boolean", SqlTypeName.BOOLEAN) .build(); @@ -70,6 +72,7 @@ public void encodeAndDecode() throws Exception { "hello", LocalTime.now(), LocalDate.now(), + LocalDateTime.now(), DateTime.now().toInstant(), true) .build(); diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java index cc253809387b..5186099bddae 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/DateTimeUtils.java @@ -20,11 +20,13 @@ import com.google.zetasql.CivilTimeEncoder; import com.google.zetasql.Value; import io.grpc.Status; +import java.time.LocalDateTime; import java.time.LocalTime; import java.util.List; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnit; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.DateString; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimeString; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; @@ -161,10 +163,17 @@ public static DateTime parseTime(String str) { } } - public static TimeString convertTimeValueToTimeString(Value value) { - LocalTime localTime = CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue()); - return new TimeString(localTime.getHour(), localTime.getMinute(), localTime.getSecond()) - .withNanos(localTime.getNano()); + public static TimestampString convertDateTimeValueToTimeStampString(Value value) { + LocalDateTime dateTime = + CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue()); + return new TimestampString( + dateTime.getYear(), + dateTime.getMonthValue(), + dateTime.getDayOfMonth(), + dateTime.getHour(), + dateTime.getMinute(), + dateTime.getSecond()) + .withNanos(dateTime.getNano()); } // dates are represented as an int32 value, indicating the offset @@ -174,6 +183,12 @@ public static DateString convertDateValueToDateString(Value value) { return DateString.fromDaysSinceEpoch(value.getDateValue()); } + public static TimeString convertTimeValueToTimeString(Value value) { + LocalTime localTime = CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue()); + return new TimeString(localTime.getHour(), localTime.getMinute(), localTime.getSecond()) + .withNanos(localTime.getNano()); + } + public static Value parseDateToValue(String dateString) { DateTime dateTime = parseDate(dateString); return Value.createDateValue((int) (dateTime.getMillis() / MILLIS_PER_DAY)); diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java index 6ccfb265a1be..eb8d51f673f1 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java @@ -197,23 +197,23 @@ class SupportedZetaSqlBuiltinFunctions { // Time functions FunctionSignatureId.FN_CURRENT_DATE, // current_date - // FunctionSignatureId.FN_CURRENT_DATETIME, // current_datetime + FunctionSignatureId.FN_CURRENT_DATETIME, // current_datetime FunctionSignatureId.FN_CURRENT_TIME, // current_time FunctionSignatureId.FN_CURRENT_TIMESTAMP, // current_timestamp FunctionSignatureId.FN_DATE_ADD_DATE, // date_add - // FunctionSignatureId.FN_DATETIME_ADD, // datetime_add + FunctionSignatureId.FN_DATETIME_ADD, // datetime_add FunctionSignatureId.FN_TIME_ADD, // time_add FunctionSignatureId.FN_TIMESTAMP_ADD, // timestamp_add FunctionSignatureId.FN_DATE_DIFF_DATE, // date_diff - // FunctionSignatureId.FN_DATETIME_DIFF, // datetime_diff + FunctionSignatureId.FN_DATETIME_DIFF, // datetime_diff FunctionSignatureId.FN_TIME_DIFF, // time_diff FunctionSignatureId.FN_TIMESTAMP_DIFF, // timestamp_diff FunctionSignatureId.FN_DATE_SUB_DATE, // date_sub - // FunctionSignatureId.FN_DATETIME_SUB, // datetime_sub + FunctionSignatureId.FN_DATETIME_SUB, // datetime_sub FunctionSignatureId.FN_TIME_SUB, // time_sub FunctionSignatureId.FN_TIMESTAMP_SUB, // timestamp_sub FunctionSignatureId.FN_DATE_TRUNC_DATE, // date_trunc - // FunctionSignatureId.FN_DATETIME_TRUNC, // datetime_trunc + FunctionSignatureId.FN_DATETIME_TRUNC, // datetime_trunc FunctionSignatureId.FN_TIME_TRUNC, // time_trunc FunctionSignatureId.FN_TIMESTAMP_TRUNC, // timestamp_trunc FunctionSignatureId.FN_DATE_FROM_UNIX_DATE, // date_from_unix_date @@ -234,19 +234,18 @@ class SupportedZetaSqlBuiltinFunctions { FunctionSignatureId.FN_UNIX_MILLIS_FROM_TIMESTAMP, // FunctionSignatureId.FN_UNIX_MICROS_FROM_TIMESTAMP, FunctionSignatureId.FN_DATE_FROM_TIMESTAMP, // date - // FunctionSignatureId.FN_DATE_FROM_DATETIME, // date + FunctionSignatureId.FN_DATE_FROM_DATETIME, // date FunctionSignatureId.FN_DATE_FROM_YEAR_MONTH_DAY, // date FunctionSignatureId.FN_TIMESTAMP_FROM_STRING, // timestamp FunctionSignatureId.FN_TIMESTAMP_FROM_DATE, // timestamp - // FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME, // timestamp + FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME, // timestamp FunctionSignatureId.FN_TIME_FROM_HOUR_MINUTE_SECOND, // time FunctionSignatureId.FN_TIME_FROM_TIMESTAMP, // time - // FunctionSignatureId.FN_TIME_FROM_DATETIME, // time - // FunctionSignatureId.FN_DATETIME_FROM_DATE_AND_TIME, // datetime - // FunctionSignatureId.FN_DATETIME_FROM_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND, // datetime - // FunctionSignatureId.FN_DATETIME_FROM_TIMESTAMP, // datetime - // FunctionSignatureId.FN_DATETIME_FROM_DATE, // datetime - + FunctionSignatureId.FN_TIME_FROM_DATETIME, // time + FunctionSignatureId.FN_DATETIME_FROM_DATE_AND_TIME, // datetime + FunctionSignatureId.FN_DATETIME_FROM_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND, // datetime + FunctionSignatureId.FN_DATETIME_FROM_TIMESTAMP, // datetime + FunctionSignatureId.FN_DATETIME_FROM_DATE, // datetime FunctionSignatureId.FN_STRING_FROM_TIMESTAMP, // string // Signatures for extracting date parts, taking a date/timestamp @@ -258,23 +257,24 @@ class SupportedZetaSqlBuiltinFunctions { // Signatures specific to extracting the DATE date part from a DATETIME or a // TIMESTAMP. - // FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date + FunctionSignatureId.FN_EXTRACT_DATE_FROM_DATETIME, // $extract_date FunctionSignatureId.FN_EXTRACT_DATE_FROM_TIMESTAMP, // $extract_date // Signatures specific to extracting the TIME date part from a DATETIME or a // TIMESTAMP. - // FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time + FunctionSignatureId.FN_EXTRACT_TIME_FROM_DATETIME, // $extract_time FunctionSignatureId.FN_EXTRACT_TIME_FROM_TIMESTAMP, // $extract_time // Signature specific to extracting the DATETIME date part from a TIMESTAMP. - // FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime + FunctionSignatureId.FN_EXTRACT_DATETIME_FROM_TIMESTAMP, // $extract_datetime + // Signature for formatting and parsing FunctionSignatureId.FN_FORMAT_DATE, // format_date - // FunctionSignatureId.FN_FORMAT_DATETIME, // format_datetime + FunctionSignatureId.FN_FORMAT_DATETIME, // format_datetime FunctionSignatureId.FN_FORMAT_TIME, // format_time FunctionSignatureId.FN_FORMAT_TIMESTAMP, // format_timestamp FunctionSignatureId.FN_PARSE_DATE, // parse_date - // FunctionSignatureId.FN_PARSE_DATETIME, // parse_datetime + FunctionSignatureId.FN_PARSE_DATETIME, // parse_datetime FunctionSignatureId.FN_PARSE_TIME, // parse_time FunctionSignatureId.FN_PARSE_TIMESTAMP, // parse_timestamp diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java index 073aa413700d..dbab34ab5362 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtils.java @@ -28,6 +28,7 @@ import com.google.zetasql.ZetaSQLType.TypeKind; import java.math.BigDecimal; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayList; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.DateTime; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath; @@ -45,7 +47,6 @@ * Utility methods for ZetaSQL <=> Beam translation. * *

Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, GEOGRAPHY - * TODO[BEAM-10238]: support ZetaSQL types: TIME, DATETIME, NUMERIC */ @Internal public final class ZetaSqlBeamTranslationUtils { @@ -106,6 +107,9 @@ private static Type beamLogicalTypeToZetaSqlType(String identifier) { } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) { // Time type return TypeFactory.createSimpleType(TypeKind.TYPE_TIME); + } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) { + // DateTime type + return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME); } else { throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier); } @@ -184,6 +188,20 @@ private static Value beamLogicalObjectToZetaSqlValue(Object object, String ident } else { // input type return Value.createTimeValue(CivilTimeEncoder.encodePacked64TimeNanos((LocalTime) object)); } + } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) { + // DateTime value + LocalDateTime datetime; + if (object instanceof Row) { // base type + datetime = + LocalDateTime.of( + LocalDate.ofEpochDay(((Row) object).getInt64(DateTime.DATE_FIELD_NAME)), + LocalTime.ofNanoOfDay(((Row) object).getInt64(DateTime.TIME_FIELD_NAME))); + } else { // input type + datetime = (LocalDateTime) object; + } + // TODO[BEAM-10611]: Create ZetaSQL Value.createDatetimeValue(LocalDateTime) function + return Value.createDatetimeValue( + CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano()); } else { throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier); } @@ -208,6 +226,8 @@ public static FieldType zetaSqlTypeToBeamFieldType(Type type) { return FieldType.logicalType(SqlTypes.DATE).withNullable(true); case TYPE_TIME: return FieldType.logicalType(SqlTypes.TIME).withNullable(true); + case TYPE_DATETIME: + return FieldType.logicalType(SqlTypes.DATETIME).withNullable(true); case TYPE_TIMESTAMP: return FieldType.DATETIME.withNullable(true); case TYPE_ARRAY: @@ -314,6 +334,9 @@ private static Object zetaSqlValueToBeamLogicalObject(Value value, String identi } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) { // Time value return CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime(value.getTimeValue()); + } else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) { + // DateTime value + return CivilTimeEncoder.decodePacked96DatetimeNanosAsJavaTime(value.getDatetimeValue()); } else { throw new UnsupportedOperationException("Unknown Beam logical type: " + identifier); } diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java index d8394ab28df4..81bc14226a8a 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java @@ -20,6 +20,7 @@ import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BOOL; import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BYTES; import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATE; +import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME; import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DOUBLE; import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64; import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC; @@ -46,7 +47,6 @@ * Utility methods for ZetaSQL <=> Calcite translation. * *

Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, GEOGRAPHY - * TODO[BEAM-10238]: support ZetaSQL types: TIME, DATETIME, NUMERIC */ @Internal public final class ZetaSqlCalciteTranslationUtils { @@ -72,6 +72,8 @@ public static Type toZetaType(RelDataType calciteType) { return TypeFactory.createSimpleType(TYPE_DATE); case TIME: return TypeFactory.createSimpleType(TYPE_TIME); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return TypeFactory.createSimpleType(TYPE_DATETIME); case TIMESTAMP: return TypeFactory.createSimpleType(TYPE_TIMESTAMP); case ARRAY: @@ -107,6 +109,8 @@ public static SqlTypeName toCalciteTypeName(TypeKind type) { return SqlTypeName.DATE; case TYPE_TIME: return SqlTypeName.TIME; + case TYPE_DATETIME: + return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE; case TYPE_TIMESTAMP: // TODO: handle timestamp with time zone. return SqlTypeName.TIMESTAMP; diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java index 19f18cdb1c6b..fd5651f6d5c9 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java @@ -24,6 +24,7 @@ import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64; import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_STRING; import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIMESTAMP; +import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.convertDateTimeValueToTimeStampString; import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.convertDateValueToDateString; import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.convertTimeValueToTimeString; import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.safeMicrosToMillis; @@ -545,7 +546,7 @@ public RexNode convertResolvedLiteral(ResolvedLiteral resolvedLiteral) { case TYPE_TIMESTAMP: case TYPE_DATE: case TYPE_TIME: - // case TYPE_DATETIME: + case TYPE_DATETIME: case TYPE_BYTES: case TYPE_ARRAY: case TYPE_STRUCT: @@ -709,7 +710,7 @@ private RexNode convertValueToRexNode(Type type, Value value) { case TYPE_TIMESTAMP: case TYPE_DATE: case TYPE_TIME: - // case TYPE_DATETIME: + case TYPE_DATETIME: case TYPE_BYTES: ret = convertSimpleValueToRexNode(type.getKind(), value); break; @@ -792,9 +793,7 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) { rexBuilder() .makeCall( SqlOperators.createZetaSqlFunction(wrapperFun, returnType.getSqlTypeName()), - ImmutableList.of( - rexBuilder() - .makeApproxLiteral(new BigDecimal(Math.random()), returnType))); + rexBuilder().makeApproxLiteral(new BigDecimal(Math.random()), returnType)); ; } else { ret = @@ -823,12 +822,11 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) { SqlOperators.createZetaSqlFunction( BeamBigQuerySqlDialect.NUMERIC_LITERAL_FUNCTION, ZetaSqlCalciteTranslationUtils.toCalciteTypeName(kind)), - ImmutableList.of( - rexBuilder() - .makeExactLiteral( - value.getNumericValue(), - ZetaSqlCalciteTranslationUtils.toSimpleRelDataType( - kind, rexBuilder())))); + rexBuilder() + .makeExactLiteral( + value.getNumericValue(), + ZetaSqlCalciteTranslationUtils.toSimpleRelDataType( + kind, rexBuilder()))); break; case TYPE_TIMESTAMP: ret = @@ -850,6 +848,15 @@ private RexNode convertSimpleValueToRexNode(TypeKind kind, Value value) { // TODO: Doing micro to mills truncation, need to throw exception. ret = rexBuilder().makeLiteral(convertTimeValueToTimeString(value), timeType, false); break; + case TYPE_DATETIME: + ret = + rexBuilder() + .makeTimestampWithLocalTimeZoneLiteral( + convertDateTimeValueToTimeStampString(value), + typeFactory() + .getTypeSystem() + .getMaxPrecision(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)); + break; case TYPE_BYTES: ret = rexBuilder().makeBinaryLiteral(new ByteString(value.getBytesValue().toByteArray())); break; diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java index 25f40edf1c6a..9137b94681a5 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/TableScanConverter.java @@ -17,17 +17,13 @@ */ package org.apache.beam.sdk.extensions.sql.zetasql.translation; -import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME; import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkNotNull; -import com.google.zetasql.ZetaSQLType.TypeKind; -import com.google.zetasql.resolvedast.ResolvedColumn; import com.google.zetasql.resolvedast.ResolvedNodes.ResolvedTableScan; import java.util.List; import java.util.Properties; import org.apache.beam.sdk.extensions.sql.zetasql.TableResolution; import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.config.CalciteConnectionConfigImpl; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteSchema; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; @@ -44,16 +40,12 @@ /** Converts table scan. */ class TableScanConverter extends RelConverter { - private static final ImmutableSet UNSUPPORTED_DATA_TYPES = - ImmutableSet.of(TYPE_DATETIME); - TableScanConverter(ConversionContext context) { super(context); } @Override public RelNode convert(ResolvedTableScan zetaNode, List inputs) { - checkTableScanSchema(zetaNode.getColumnList()); List tablePath = getTablePath(zetaNode.getTable()); @@ -115,15 +107,4 @@ public RelOptCluster getCluster() { } }; } - - private void checkTableScanSchema(List columnList) { - if (columnList != null) { - for (ResolvedColumn resolvedColumn : columnList) { - if (UNSUPPORTED_DATA_TYPES.contains(resolvedColumn.getType().getKind())) { - throw new UnsupportedOperationException( - "Does not support " + UNSUPPORTED_DATA_TYPES + " types in source tables"); - } - } - } - } } diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java index a761d9f59b7d..2edb4d03daf3 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/TestInput.java @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Arrays; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; @@ -225,47 +226,59 @@ class TestInput { public static final TestBoundedTable TABLE_EMPTY = TestBoundedTable.of(Schema.builder().addInt64Field("ColId").addStringField("Value").build()); - private static final Schema TABLE_WTH_MAP_SCHEMA = + private static final Schema TABLE_WITH_MAP_SCHEMA = Schema.builder() .addMapField("map_field", FieldType.STRING, FieldType.STRING) .addRowField("row_field", structSchema) .build(); public static final TestBoundedTable TABLE_WITH_MAP = - TestBoundedTable.of(TABLE_WTH_MAP_SCHEMA) + TestBoundedTable.of(TABLE_WITH_MAP_SCHEMA) .addRows( ImmutableMap.of("MAP_KEY_1", "MAP_VALUE_1"), Row.withSchema(structSchema).addValues(1L, "data1").build()); - private static final Schema TABLE_WTH_DATE_SCHEMA = + private static final Schema TABLE_WITH_DATE_SCHEMA = Schema.builder() .addLogicalTypeField("date_field", SqlTypes.DATE) .addStringField("str_field") .build(); public static final TestBoundedTable TABLE_WITH_DATE = - TestBoundedTable.of(TABLE_WTH_DATE_SCHEMA) + TestBoundedTable.of(TABLE_WITH_DATE_SCHEMA) .addRows(LocalDate.of(2008, 12, 25), "s") .addRows(LocalDate.of(2020, 4, 7), "s"); - private static final Schema TABLE_WTH_TIME_SCHEMA = + private static final Schema TABLE_WITH_TIME_SCHEMA = Schema.builder() .addLogicalTypeField("time_field", SqlTypes.TIME) .addStringField("str_field") .build(); public static final TestBoundedTable TABLE_WITH_TIME = - TestBoundedTable.of(TABLE_WTH_TIME_SCHEMA) + TestBoundedTable.of(TABLE_WITH_TIME_SCHEMA) .addRows(LocalTime.of(15, 30, 0), "s") .addRows(LocalTime.of(23, 35, 59), "s"); - private static final Schema TABLE_WTH_NUMERIC_SCHEMA = + private static final Schema TABLE_WITH_NUMERIC_SCHEMA = Schema.builder().addDecimalField("numeric_field").addStringField("str_field").build(); + public static final TestBoundedTable TABLE_WITH_NUMERIC = - TestBoundedTable.of(TABLE_WTH_NUMERIC_SCHEMA) + TestBoundedTable.of(TABLE_WITH_NUMERIC_SCHEMA) .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"), "str1") .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"), "str2") .addRows(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"), "str3"); + private static final Schema TABLE_WITH_DATETIME_SCHEMA = + Schema.builder() + .addLogicalTypeField("datetime_field", SqlTypes.DATETIME) + .addStringField("str_field") + .build(); + + public static final TestBoundedTable TABLE_WITH_DATETIME = + TestBoundedTable.of(TABLE_WITH_DATETIME_SCHEMA) + .addRows(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000), "s") + .addRows(LocalDateTime.of(2012, 10, 6, 11, 45, 0).withNano(987654000), "s"); + private static byte[] stringToBytes(String s) { return s.getBytes(StandardCharsets.UTF_8); } diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java index 05105907c8cd..7b450fb3b4d6 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlBeamTranslationUtilsTest.java @@ -28,6 +28,7 @@ import com.google.zetasql.Value; import com.google.zetasql.ZetaSQLType.TypeKind; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Arrays; import org.apache.beam.sdk.schemas.Schema; @@ -54,12 +55,12 @@ public class ZetaSqlBeamTranslationUtilsTest { .addField("f_string", FieldType.STRING) .addField("f_bytes", FieldType.BYTES) .addLogicalTypeField("f_date", SqlTypes.DATE) - // .addLogicalTypeField("f_datetime", SqlTypes.DATETIME) + .addLogicalTypeField("f_datetime", SqlTypes.DATETIME) .addLogicalTypeField("f_time", SqlTypes.TIME) .addField("f_timestamp", FieldType.DATETIME) .addArrayField("f_array", FieldType.DOUBLE) .addRowField("f_struct", TEST_INNER_SCHEMA) - // .addLogicalTypeField("f_numeric", SqlTypes.NUMERIC) + .addField("f_numeric", FieldType.DECIMAL) .addNullableField("f_null", FieldType.INT64) .build(); @@ -83,10 +84,12 @@ public class ZetaSqlBeamTranslationUtilsTest { new StructField("f_string", TypeFactory.createSimpleType(TypeKind.TYPE_STRING)), new StructField("f_bytes", TypeFactory.createSimpleType(TypeKind.TYPE_BYTES)), new StructField("f_date", TypeFactory.createSimpleType(TypeKind.TYPE_DATE)), + new StructField("f_datetime", TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME)), new StructField("f_time", TypeFactory.createSimpleType(TypeKind.TYPE_TIME)), new StructField("f_timestamp", TypeFactory.createSimpleType(TypeKind.TYPE_TIMESTAMP)), new StructField("f_array", TEST_INNER_ARRAY_TYPE), new StructField("f_struct", TEST_INNER_STRUCT_TYPE), + new StructField("f_numeric", TypeFactory.createSimpleType(TypeKind.TYPE_NUMERIC)), new StructField("f_null", TypeFactory.createSimpleType(TypeKind.TYPE_INT64)))); private static final Row TEST_ROW = @@ -97,10 +100,12 @@ public class ZetaSqlBeamTranslationUtilsTest { .addValue("Hello") .addValue(new byte[] {0x11, 0x22}) .addValue(LocalDate.of(2020, 6, 4)) + .addValue(LocalDateTime.of(2008, 12, 25, 15, 30, 0)) .addValue(LocalTime.of(15, 30, 45)) .addValue(Instant.ofEpochMilli(12345678L)) .addArray(3.0, 6.5) .addValue(Row.withSchema(TEST_INNER_SCHEMA).addValues(0L, "world").build()) + .addValue(ZetaSqlTypesUtils.bigDecimalAsNumeric("12346")) .addValue(null) .build(); @@ -114,6 +119,10 @@ public class ZetaSqlBeamTranslationUtilsTest { Value.createStringValue("Hello"), Value.createBytesValue(ByteString.copyFrom(new byte[] {0x11, 0x22})), Value.createDateValue((int) LocalDate.of(2020, 6, 4).toEpochDay()), + Value.createDatetimeValue( + CivilTimeEncoder.encodePacked64DatetimeSeconds( + LocalDateTime.of(2008, 12, 25, 15, 30, 0)), + LocalDateTime.of(2008, 12, 25, 15, 30, 0).getNano()), Value.createTimeValue( CivilTimeEncoder.encodePacked64TimeNanos(LocalTime.of(15, 30, 45))), Value.createTimestampValueFromUnixMicros(12345678000L), @@ -123,6 +132,7 @@ public class ZetaSqlBeamTranslationUtilsTest { Value.createStructValue( TEST_INNER_STRUCT_TYPE, Arrays.asList(Value.createInt64Value(0L), Value.createStringValue("world"))), + Value.createNumericValue(ZetaSqlTypesUtils.bigDecimalAsNumeric("12346")), Value.createNullValue(TypeFactory.createSimpleType(TypeKind.TYPE_INT64)))); @Test diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java index e9c51c76374e..d1480127f605 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java @@ -2942,7 +2942,7 @@ public void testUNNESTParameters() { } @Test - @Ignore("BEAM-9515") + @Ignore("[BEAM-9515] ArrayScanToUncollectConverter Unnest does not support sub-queries") public void testUNNESTExpression() { String sql = "SELECT * FROM UNNEST(ARRAY(SELECT Value FROM KeyValue));"; ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java index eacb6b88b33b..937fb6c33c61 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTestBase.java @@ -60,6 +60,7 @@ private TableProvider createBeamTableProvider() { testBoundedTableMap.put("table_with_date", TestInput.TABLE_WITH_DATE); testBoundedTableMap.put("table_with_time", TestInput.TABLE_WITH_TIME); testBoundedTableMap.put("table_with_numeric", TestInput.TABLE_WITH_NUMERIC); + testBoundedTableMap.put("table_with_datetime", TestInput.TABLE_WITH_DATETIME); testBoundedTableMap.put( "table_with_struct_ts_string", TestInput.TABLE_WITH_STRUCT_TIMESTAMP_STRING); testBoundedTableMap.put("streaming_sql_test_table_a", TestInput.STREAMING_SQL_TABLE_A); diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java index 6789d63b2f19..109ca1e0af40 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTimeFunctionsTest.java @@ -23,9 +23,11 @@ import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithTimeZone; import static org.apache.beam.sdk.extensions.sql.zetasql.DateTimeUtils.parseTimestampWithUTCTimeZone; +import com.google.zetasql.CivilTimeEncoder; import com.google.zetasql.Value; import com.google.zetasql.ZetaSQLType.TypeKind; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.LocalTime; import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; @@ -40,7 +42,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Duration; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -148,7 +149,7 @@ public void testExtractFromDate() { + " EXTRACT(ISOYEAR FROM date) AS isoyear,\n" + " EXTRACT(YEAR FROM date) AS year,\n" + " EXTRACT(ISOWEEK FROM date) AS isoweek,\n" - // TODO[BEAM-9178]: Add tests for DATE_TRUNC and EXTRACT with "week with weekday" date + // TODO[BEAM-10606]: Add tests for DATE_TRUNC and EXTRACT with "week with weekday" date // parts once they are supported // + " EXTRACT(WEEK FROM date) AS week,\n" + " EXTRACT(MONTH FROM date) AS month,\n" @@ -218,6 +219,22 @@ public void testDateFromTimestamp() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + @Test + public void testDateFromDateTime() { + String sql = "SELECT DATE(DATETIME '2008-12-25 15:30:00.123456')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addLogicalTypeField("f_date", SqlTypes.DATE).build()) + .addValues(LocalDate.of(2008, 12, 25)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + @Test public void testDateAdd() { String sql = @@ -579,6 +596,22 @@ public void testTimeFromTimestamp() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + @Test + public void testTimeFromDateTime() { + String sql = "SELECT TIME(DATETIME '2008-12-25 15:30:00.123456')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build()) + .addValues(LocalTime.of(15, 30, 0, 123456000)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + @Test public void testTimeAdd() { String sql = @@ -753,13 +786,420 @@ public void testParseTime() { ///////////////////////////////////////////////////////////////////////////// @Test - @Ignore("Does not support Datetime literal.") - public void testDatetimeLiteral() { - String sql = "SELECT DATETIME '2018-01-01 05:30:00.334'"; + public void testDateTimeLiteral() { + String sql = "SELECT DATETIME '2008-12-25 15:30:00.123456'"; + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unsupported ResolvedLiteral type: DATETIME"); - zetaSQLQueryPlanner.convertToBeamRel(sql); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build()) + .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeColumn() { + String sql = "SELECT FORMAT_DATETIME('%D %T %E6S', datetime_field) FROM table_with_datetime"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addStringField("f_datetime_str").build()) + .addValues("12/25/08 15:30:00 00.123456") + .build(), + Row.withSchema(Schema.builder().addStringField("f_datetime_str").build()) + .addValues("10/06/12 11:45:00 00.987654") + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testGroupByDateTime() { + String sql = "SELECT datetime_field, COUNT(*) FROM table_with_datetime GROUP BY datetime_field"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + final Schema schema = + Schema.builder() + .addLogicalTypeField("datetime_field", SqlTypes.DATETIME) + .addInt64Field("count") + .build(); + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(schema) + .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000), 1L) + .build(), + Row.withSchema(schema) + .addValues(LocalDateTime.of(2012, 10, 6, 11, 45, 0).withNano(987654000), 1L) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testAggregateOnDateTime() { + String sql = "SELECT MAX(datetime_field) FROM table_with_datetime GROUP BY str_field"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder() + .addLogicalTypeField("datetime_field", SqlTypes.DATETIME) + .build()) + .addValues(LocalDateTime.of(2012, 10, 6, 11, 45, 0).withNano(987654000)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + // TODO[BEAM-9166]: Add a test for CURRENT_DATETIME function ("SELECT CURRENT_DATETIME()") + + @Test + public void testExtractFromDateTime() { + String sql = + "SELECT " + + "EXTRACT(YEAR FROM DATETIME '2008-12-25 15:30:00') as year, " + + "EXTRACT(QUARTER FROM DATETIME '2008-12-25 15:30:00') as quarter, " + + "EXTRACT(MONTH FROM DATETIME '2008-12-25 15:30:00') as month, " + // TODO[BEAM-10606]: Add tests for DATETIME_TRUNC and EXTRACT with "week with weekday" + // date parts once they are supported + // + "EXTRACT(WEEK FROM DATETIME '2008-12-25 15:30:00') as week, " + + "EXTRACT(DAY FROM DATETIME '2008-12-25 15:30:00') as day, " + + "EXTRACT(DAYOFWEEK FROM DATETIME '2008-12-25 15:30:00') as dayofweek, " + + "EXTRACT(DAYOFYEAR FROM DATETIME '2008-12-25 15:30:00') as dayofyear, " + + "EXTRACT(HOUR FROM DATETIME '2008-12-25 15:30:00.123456') as hour, " + + "EXTRACT(MINUTE FROM DATETIME '2008-12-25 15:30:00.123456') as minute, " + + "EXTRACT(SECOND FROM DATETIME '2008-12-25 15:30:00.123456') as second, " + + "EXTRACT(MILLISECOND FROM DATETIME '2008-12-25 15:30:00.123456') as millisecond, " + + "EXTRACT(MICROSECOND FROM DATETIME '2008-12-25 15:30:00.123456') as microsecond, " + + "EXTRACT(DATE FROM DATETIME '2008-12-25 15:30:00.123456') as date, " + + "EXTRACT(TIME FROM DATETIME '2008-12-25 15:30:00.123456') as time "; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + final Schema schema = + Schema.builder() + .addInt64Field("year") + .addInt64Field("quarter") + .addInt64Field("month") + // .addInt64Field("week") + .addInt64Field("day") + .addInt64Field("dayofweek") + .addInt64Field("dayofyear") + .addInt64Field("hour") + .addInt64Field("minute") + .addInt64Field("second") + .addInt64Field("millisecond") + .addInt64Field("microsecond") + .addLogicalTypeField("date", SqlTypes.DATE) + .addLogicalTypeField("time", SqlTypes.TIME) + .build(); + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(schema) + .addValues( + 2008L, + 4L, + 12L, + // 52L, + 25L, + 5L, + 360L, + 15L, + 30L, + 0L, + 123L, + 123456L, + LocalDate.of(2008, 12, 25), + LocalTime.of(15, 30, 0, 123456000)) + .build()); + + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeFromDateAndTime() { + String sql = "SELECT DATETIME(DATE '2008-12-25', TIME '15:30:00.123456')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build()) + .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeFromDate() { + String sql = "SELECT DATETIME(DATE '2008-12-25')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build()) + .addValues(LocalDateTime.of(2008, 12, 25, 0, 0, 0)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeFromYearMonthDayHourMinuteSecond() { + String sql = "SELECT DATETIME(2008, 12, 25, 15, 30, 0)"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build()) + .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeFromTimestamp() { + String sql = "SELECT DATETIME(TIMESTAMP '2008-12-25 15:30:00+08', 'America/Los_Angeles')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build()) + .addValues(LocalDateTime.of(2008, 12, 24, 23, 30, 0)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeAdd() { + String sql = + "SELECT " + + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MICROSECOND), " + + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MILLISECOND), " + + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 SECOND), " + + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MINUTE), " + + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 HOUR), " + + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 DAY), " + + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MONTH), " + + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 QUARTER), " + + "DATETIME_ADD(DATETIME '2008-12-25 15:30:00', INTERVAL 10 YEAR) "; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder() + .addLogicalTypeField("f_time1", SqlTypes.DATETIME) + .addLogicalTypeField("f_time2", SqlTypes.DATETIME) + .addLogicalTypeField("f_time3", SqlTypes.DATETIME) + .addLogicalTypeField("f_time4", SqlTypes.DATETIME) + .addLogicalTypeField("f_time5", SqlTypes.DATETIME) + .addLogicalTypeField("f_time6", SqlTypes.DATETIME) + .addLogicalTypeField("f_time7", SqlTypes.DATETIME) + .addLogicalTypeField("f_time8", SqlTypes.DATETIME) + .addLogicalTypeField("f_time9", SqlTypes.DATETIME) + .build()) + .addValues( + LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(10000), + LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(10000000), + LocalDateTime.of(2008, 12, 25, 15, 30, 10), + LocalDateTime.of(2008, 12, 25, 15, 40, 0), + LocalDateTime.of(2008, 12, 26, 1, 30, 0), + LocalDateTime.of(2009, 1, 4, 15, 30, 0), + LocalDateTime.of(2009, 10, 25, 15, 30, 0), + LocalDateTime.of(2011, 6, 25, 15, 30, 0), + LocalDateTime.of(2018, 12, 25, 15, 30, 0)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeAddWithParameter() { + String sql = "SELECT DATETIME_ADD(@p0, INTERVAL @p1 HOUR)"; + + LocalDateTime datetime = LocalDateTime.of(2008, 12, 25, 15, 30, 00).withNano(123456000); + ImmutableMap params = + ImmutableMap.of( + "p0", + Value.createDatetimeValue( + CivilTimeEncoder.encodePacked64DatetimeSeconds(datetime), datetime.getNano()), + "p1", Value.createInt64Value(3L)); + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build()) + .addValues(LocalDateTime.of(2008, 12, 25, 18, 30, 00).withNano(123456000)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeSub() { + String sql = + "SELECT " + + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MICROSECOND), " + + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MILLISECOND), " + + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 SECOND), " + + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MINUTE), " + + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 HOUR), " + + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 DAY), " + + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 MONTH), " + + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 QUARTER), " + + "DATETIME_SUB(DATETIME '2008-12-25 15:30:00', INTERVAL 10 YEAR) "; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder() + .addLogicalTypeField("f_time1", SqlTypes.DATETIME) + .addLogicalTypeField("f_time2", SqlTypes.DATETIME) + .addLogicalTypeField("f_time3", SqlTypes.DATETIME) + .addLogicalTypeField("f_time4", SqlTypes.DATETIME) + .addLogicalTypeField("f_time5", SqlTypes.DATETIME) + .addLogicalTypeField("f_time6", SqlTypes.DATETIME) + .addLogicalTypeField("f_time7", SqlTypes.DATETIME) + .addLogicalTypeField("f_time8", SqlTypes.DATETIME) + .addLogicalTypeField("f_time9", SqlTypes.DATETIME) + .build()) + .addValues( + LocalDateTime.of(2008, 12, 25, 15, 29, 59).withNano(999990000), + LocalDateTime.of(2008, 12, 25, 15, 29, 59).withNano(990000000), + LocalDateTime.of(2008, 12, 25, 15, 29, 50), + LocalDateTime.of(2008, 12, 25, 15, 20, 0), + LocalDateTime.of(2008, 12, 25, 5, 30, 0), + LocalDateTime.of(2008, 12, 15, 15, 30, 0), + LocalDateTime.of(2008, 2, 25, 15, 30, 0), + LocalDateTime.of(2006, 6, 25, 15, 30, 0), + LocalDateTime.of(1998, 12, 25, 15, 30, 0)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeDiff() { + String sql = + "SELECT DATETIME_DIFF(DATETIME '2008-12-25 15:30:00', DATETIME '2008-10-25 15:30:00', DAY)"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addInt64Field("f_datetime_diff").build()) + .addValues(61L) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeDiffNegativeResult() { + String sql = + "SELECT DATETIME_DIFF(DATETIME '2008-10-25 15:30:00', DATETIME '2008-12-25 15:30:00', DAY)"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addInt64Field("f_datetime_diff").build()) + .addValues(-61L) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testDateTimeTrunc() { + String sql = "SELECT DATETIME_TRUNC(DATETIME '2008-12-25 15:30:00', HOUR)"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder() + .addLogicalTypeField("f_datetime_trunc", SqlTypes.DATETIME) + .build()) + .addValues(LocalDateTime.of(2008, 12, 25, 15, 0, 0)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testFormatDateTime() { + String sql = "SELECT FORMAT_DATETIME('%D %T %E6S', DATETIME '2008-12-25 15:30:00.123456')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addStringField("f_datetime_str").build()) + .addValues("12/25/08 15:30:00 00.123456") + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testParseDateTime() { + String sql = "SELECT PARSE_DATETIME('%Y-%m-%d %H:%M:%E6S', '2008-12-25 15:30:00.123456')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder().addLogicalTypeField("f_datetime", SqlTypes.DATETIME).build()) + .addValues(LocalDateTime.of(2008, 12, 25, 15, 30, 0).withNano(123456000)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } ///////////////////////////////////////////////////////////////////////////// @@ -846,7 +1286,7 @@ public void testExtractFromTimestamp() { + " EXTRACT(ISOYEAR FROM timestamp) AS isoyear,\n" + " EXTRACT(YEAR FROM timestamp) AS year,\n" + " EXTRACT(ISOWEEK FROM timestamp) AS isoweek,\n" - // TODO[BEAM-9178]: Add tests for TIMESTAMP_TRUNC and EXTRACT with "week with weekday" + // TODO[BEAM-10606]: Add tests for TIMESTAMP_TRUNC and EXTRACT with "week with weekday" // date parts once they are supported // + " EXTRACT(WEEK FROM timestamp) AS week,\n" + " EXTRACT(MONTH FROM timestamp) AS month,\n" @@ -925,6 +1365,23 @@ public void testExtractTimeFromTimestamp() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + @Test + public void testExtractDateTimeFromTimestamp() { + String sql = "SELECT EXTRACT(DATETIME FROM TIMESTAMP '2017-05-26 12:34:56')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema( + Schema.builder().addLogicalTypeField("datetime", SqlTypes.DATETIME).build()) + .addValues(LocalDateTime.of(2017, 5, 26, 12, 34, 56)) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + @Test public void testExtractFromTimestampAtTimeZone() { String sql = @@ -1027,6 +1484,45 @@ public void testTimestampFromDateWithDefaultTimezoneSet() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + @Test + public void testTimestampFromDateTime() { + String sql = "SELECT TIMESTAMP(DATETIME '2008-12-25 15:30:00')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addDateTimeField("f_timestamp").build()) + .addValues(parseTimestampWithTimeZone("2008-12-25 15:30:00+00")) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + // test default timezone works properly in query execution stage + public void testTimestampFromDateTimeWithDefaultTimezoneSet() { + String sql = "SELECT TIMESTAMP(DATETIME '2008-12-25 15:30:00')"; + + ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); + zetaSQLQueryPlanner.setDefaultTimezone("Asia/Shanghai"); + pipeline + .getOptions() + .as(BeamSqlPipelineOptions.class) + .setZetaSqlDefaultTimezone("Asia/Shanghai"); + + BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); + PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + + PAssert.that(stream) + .containsInAnyOrder( + Row.withSchema(Schema.builder().addDateTimeField("f_timestamp").build()) + .addValues(parseTimestampWithTimeZone("2008-12-25 15:30:00+08")) + .build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + @Test public void testTimestampAdd() { String sql =