diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 5cb3e8746a19d7..07e07723197469 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -1054,13 +1054,11 @@ public List getAuxiliaryFunctions() { public static final SqlAggFunction SINGLE_VALUE = SqlStdOperatorTable.SINGLE_VALUE; // ARRAY OPERATORS - public static final SqlOperator ARRAY_VALUE_CONSTRUCTOR = - SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR; + public static final SqlOperator ARRAY_VALUE_CONSTRUCTOR = new SqlArrayConstructor(); public static final SqlOperator ELEMENT = SqlStdOperatorTable.ELEMENT; // MAP OPERATORS - public static final SqlOperator MAP_VALUE_CONSTRUCTOR = - SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR; + public static final SqlOperator MAP_VALUE_CONSTRUCTOR = new SqlMapConstructor(); // ARRAY MAP SHARED OPERATORS public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM; diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayConstructor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayConstructor.java new file mode 100644 index 00000000000000..e7bbd382bf1b68 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlArrayConstructor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.fun.SqlArrayValueConstructor; +import org.apache.calcite.sql.type.SqlTypeUtil; + +/** + * {@link SqlOperator} for ARRAY, which makes explicit casting if the element type not + * equals the derived component type. + */ +public class SqlArrayConstructor extends SqlArrayValueConstructor { + + @Override + public RelDataType inferReturnType(SqlOperatorBinding opBinding) { + RelDataType type = + getComponentType(opBinding.getTypeFactory(), opBinding.collectOperandTypes()); + if (null == type) { + return null; + } + + // explicit cast elements to component type if they are not same + SqlValidatorUtils.adjustTypeForArrayConstructor(type, opBinding); + + return SqlTypeUtil.createArrayType(opBinding.getTypeFactory(), type, false); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java new file mode 100644 index 00000000000000..a54cac56641023 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlMapConstructor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.fun.SqlMapValueConstructor; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import java.util.List; + +/** + * {@link SqlOperator} for MAP, which makes explicit casting if the element type not + * equals the derived component type. + */ +public class SqlMapConstructor extends SqlMapValueConstructor { + + @Override + public RelDataType inferReturnType(SqlOperatorBinding opBinding) { + Pair type = + getComponentTypes(opBinding.getTypeFactory(), opBinding.collectOperandTypes()); + if (null == type) { + return null; + } + + // explicit cast elements to component type if they are not same + SqlValidatorUtils.adjustTypeForMapConstructor(type, opBinding); + + return SqlTypeUtil.createMapType(opBinding.getTypeFactory(), type.left, type.right, false); + } + + private Pair getComponentTypes( + RelDataTypeFactory typeFactory, List argTypes) { + return Pair.of( + typeFactory.leastRestrictive(Util.quotientList(argTypes, 2, 0)), + typeFactory.leastRestrictive(Util.quotientList(argTypes, 2, 1))); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java new file mode 100644 index 00000000000000..279541eec739f9 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/SqlValidatorUtils.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.utils; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperatorBinding; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.util.Pair; + +import java.util.List; + +/** Utility methods related to SQL validation. */ +public class SqlValidatorUtils { + + public static void adjustTypeForArrayConstructor( + RelDataType componentType, SqlOperatorBinding opBinding) { + if (opBinding instanceof SqlCallBinding) { + adjustTypeForMultisetConstructor( + componentType, componentType, (SqlCallBinding) opBinding); + } + } + + public static void adjustTypeForMapConstructor( + Pair componentType, SqlOperatorBinding opBinding) { + if (opBinding instanceof SqlCallBinding) { + adjustTypeForMultisetConstructor( + componentType.getKey(), componentType.getValue(), (SqlCallBinding) opBinding); + } + } + + /** + * When the element element does not equal with the component type, making explicit casting. + * + * @param evenType derived type for element with even index + * @param oddType derived type for element with odd index + * @param sqlCallBinding description of call + */ + private static void adjustTypeForMultisetConstructor( + RelDataType evenType, RelDataType oddType, SqlCallBinding sqlCallBinding) { + SqlCall call = sqlCallBinding.getCall(); + List operandTypes = sqlCallBinding.collectOperandTypes(); + List operands = call.getOperandList(); + RelDataType elementType; + for (int i = 0; i < operands.size(); i++) { + if (i % 2 == 0) { + elementType = evenType; + } else { + elementType = oddType; + } + if (operandTypes.get(i).equalsSansFieldNames(elementType)) { + continue; + } + call.setOperand(i, castTo(operands.get(i), elementType)); + } + } + + private static SqlNode castTo(SqlNode node, RelDataType type) { + return SqlStdOperatorTable.CAST.createCall( + SqlParserPos.ZERO, + node, + SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable())); + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml index ccce38d4d56158..fdd65b4b34b2ac 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml @@ -22,13 +22,13 @@ limitations under the License. @@ -65,6 +65,40 @@ LogicalProject(a=[$0], b=[$1], c=[$2]) 20))]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + + + + + + + + + + + + + + + + + + + + + + @@ -342,13 +376,13 @@ LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable4, sourc diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml index fac487597703cd..eee8884b7cd760 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml @@ -22,13 +22,13 @@ limitations under the License. @@ -48,6 +48,40 @@ LogicalProject(a=[$0], b=[$1], c=[$2]) 20))]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + + + + + + + + + + + + + + + + + + + + + + @@ -343,13 +377,13 @@ LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable4, sourc diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala index d6fe1890dd96cc..cdd34e22df15ba 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala @@ -171,4 +171,14 @@ class CalcTest extends TableTestBase { def testOrWithIsNullInIf(): Unit = { util.verifyExecPlan("SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable") } + + @Test + def testDecimalArrayWithDifferentPrecision(): Unit = { + util.verifyExecPlan("SELECT ARRAY[0.12, 0.5, 0.99]") + } + + @Test + def testDecimalMapWithDifferentPrecision(): Unit = { + util.verifyExecPlan("SELECT MAP['a', 0.12, 'b', 0.5]") + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala index 7b1453662c341c..659c345649ce02 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala @@ -170,4 +170,14 @@ class CalcTest extends TableTestBase { def testOrWithIsNullInIf(): Unit = { util.verifyExecPlan("SELECT IF(c = '' OR c IS NULL, 'a', 'b') FROM MyTable") } + + @Test + def testDecimalArrayWithDifferentPrecision(): Unit = { + util.verifyExecPlan("SELECT ARRAY[0.12, 0.5, 0.99]") + } + + @Test + def testDecimalMapWithDifferentPrecision(): Unit = { + util.verifyExecPlan("SELECT MAP['a', 0.12, 'b', 0.5]") + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 723800f5d94fbf..1bc6141cb71716 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -407,4 +407,30 @@ class CalcITCase extends StreamingTestBase { List("1,HI,1111,true,111","2,HELLO,2222,false,222", "3,HELLO WORLD,3333,true,333") assertEquals(expected.sorted, sink.getAppendResults.sorted) } + + @Test + def testDecimalArrayWithDifferentPrecision(): Unit = { + val sqlQuery = "SELECT ARRAY[0.12, 0.5, 0.99]" + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("[0.12, 0.50, 0.99]") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testDecimalMapWithDifferentPrecision(): Unit = { + val sqlQuery = "SELECT Map['a', 0.12, 'b', 0.5]" + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("{a=0.12, b=0.50}") + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } }