From 672c31a26c984e3ce7fc1b628dbbe23824aad8f3 Mon Sep 17 00:00:00 2001 From: Flavio Pompermaier Date: Sun, 26 Apr 2020 13:16:35 +0200 Subject: [PATCH 1/3] Handled problem of numeric with 0 precision --- .../java/io/jdbc/catalog/PostgresCatalog.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java index e5503f6fec70f0..ee7b1ee094f2f5 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalog.java @@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.DecimalType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,6 +227,8 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep public static final String PG_BIGINT_ARRAY = "_int8"; public static final String PG_REAL = "float4"; public static final String PG_REAL_ARRAY = "_float4"; + public static final String PG_DECIMAL = "decimal"; + public static final String PG_DECIMAL_ARRAY = "_decimal"; public static final String PG_DOUBLE_PRECISION = "float8"; public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8"; public static final String PG_NUMERIC = "numeric"; @@ -284,11 +287,20 @@ private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws S return DataTypes.DOUBLE(); case PG_DOUBLE_PRECISION_ARRAY: return DataTypes.ARRAY(DataTypes.DOUBLE()); + case PG_DECIMAL: case PG_NUMERIC: - return DataTypes.DECIMAL(precision, metadata.getScale(colIndex)); + // see SPARK-26538: handle numeric without explicit precision and scale. + if (precision > 0) { + return DataTypes.DECIMAL(precision, metadata.getScale(colIndex)); + } + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18); + case PG_DECIMAL_ARRAY: case PG_NUMERIC_ARRAY: - return DataTypes.ARRAY( - DataTypes.DECIMAL(precision, metadata.getScale(colIndex))); + // see SPARK-26538: handle numeric without explicit precision and scale. + if (precision > 0) { + return DataTypes.ARRAY(DataTypes.DECIMAL(precision, metadata.getScale(colIndex))); + } + return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)); case PG_CHAR: case PG_CHARACTER: return DataTypes.CHAR(precision); From a9bfe785bfe71ef25457c83c0ef9b790a3a564d0 Mon Sep 17 00:00:00 2001 From: Flavio Pompermaier Date: Sun, 26 Apr 2020 14:08:24 +0200 Subject: [PATCH 2/3] Handled default numeric of postgres (precision 0, scale 0) --- .../api/java/io/jdbc/catalog/PostgresCatalogITCase.java | 2 +- .../api/java/io/jdbc/catalog/PostgresCatalogTestBase.java | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java index b2c59e1ca0f346..1938291ecd5a59 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java @@ -85,7 +85,7 @@ public void testPrimitiveTypes() throws Exception { List results = TableUtils.collectToList( tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE))); - assertEquals("[1,[50],3,4,5.5,6.6,7.70000,true,a,b,c ,d,2016-06-22T19:10:25,2015-01-01,00:51:03]", results.toString()); + assertEquals("[1,[50],3,4,5.5,6.6,7.70000,true,a,b,c ,d,2016-06-22T19:10:25,2015-01-01,00:51:03,500.000000000000000000]", results.toString()); } @Test diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogTestBase.java index 878c02b093d8e1..0cad3f94c5d6dc 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogTestBase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogTestBase.java @@ -20,6 +20,7 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.types.logical.DecimalType; import com.opentable.db.postgres.junit.EmbeddedPostgresRules; import com.opentable.db.postgres.junit.SingleInstancePostgresRule; @@ -174,6 +175,7 @@ public static TestTable getPrimitiveTable() { // .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)) .field("date", DataTypes.DATE()) .field("time", DataTypes.TIME(0)) + .field("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)) .build(), "int integer, " + "bytea bytea, " + @@ -190,7 +192,8 @@ public static TestTable getPrimitiveTable() { "timestamp timestamp(5), " + // "timestamptz timestamptz(4), " + "date date," + - "time time(0)", + "time time(0), " + + "default_numeric numeric ", "1," + "'2'," + "3," + @@ -206,7 +209,8 @@ public static TestTable getPrimitiveTable() { "'2016-06-22 19:10:25'," + // "'2006-06-22 19:10:25'," + "'2015-01-01'," + - "'00:51:02.746572'" + "'00:51:02.746572', " + + "500" ); } From 174b4dbf50719f2a8a215be245bbd469acc3b3d5 Mon Sep 17 00:00:00 2001 From: Flavio Pompermaier Date: Fri, 1 May 2020 01:16:19 +0200 Subject: [PATCH 3/3] Added array test in PostgresCatalogITCase#testArrayTypes() --- .../flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java | 1 + .../api/java/io/jdbc/catalog/PostgresCatalogTestBase.java | 3 +++ 2 files changed, 4 insertions(+) diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java index 1938291ecd5a59..29f8d0d8cabb07 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogITCase.java @@ -103,6 +103,7 @@ public void testArrayTypes() throws Exception { "[5.5, 6.6, 7.7]," + "[6.6, 7.7, 8.8]," + "[7.70000, 8.80000, 9.90000]," + + "[8.800000000000000000, 9.900000000000000000, 10.100000000000000000]," + "[true, false, true]," + "[a, b, c]," + "[b, c, d]," + diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogTestBase.java index 0cad3f94c5d6dc..4203118f983857 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogTestBase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/catalog/PostgresCatalogTestBase.java @@ -225,6 +225,7 @@ public static TestTable getArrayTable() { .field("real_arr", DataTypes.ARRAY(DataTypes.FLOAT())) .field("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE())) .field("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5))) + .field("numeric_arr_default", DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))) .field("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN())) .field("text_arr", DataTypes.ARRAY(DataTypes.STRING())) .field("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1))) @@ -243,6 +244,7 @@ public static TestTable getArrayTable() { "real_arr real[], " + "double_precision_arr double precision[], " + "numeric_arr numeric(10, 5)[], " + + "numeric_arr_default numeric[], " + "boolean_arr boolean[], " + "text_arr text[], " + "char_arr char[], " + @@ -260,6 +262,7 @@ public static TestTable getArrayTable() { "'{5.5,6.6,7.7}'," + "'{6.6,7.7,8.8}'," + "'{7.7,8.8,9.9}'," + + "'{8.8,9.9,10.10}'," + "'{true,false,true}'," + "'{a,b,c}'," + "'{b,c,d}'," +