From 484ff6e1ce588a2b4ab788fbc2d6754f4a1364ca Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 9 Dec 2024 20:47:08 +0800 Subject: [PATCH 1/3] [FLINK-36866] Fix unable to narrow casting on numeric values It's ridiculous if we can't even write `CAST(3.14 AS INT)`. Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> # Conflicts: # flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java --- .../flink/FlinkPipelineTransformITCase.java | 234 ++++++++++++++++++ .../functions/SystemFunctionUtils.java | 137 ++++++++-- .../transform/PostTransformOperatorTest.java | 5 +- 3 files changed, 358 insertions(+), 18 deletions(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 3810f426ae2..a45b3bcbca9 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; @@ -62,6 +63,7 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; @@ -70,6 +72,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; @@ -1603,6 +1606,170 @@ void testTransformWithCommentsAndDefaultExpr() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Derrida, 26, extras], after=[], op=DELETE, meta=()}"); } + String[] runNumericCastingWith(String expression) throws Exception { + try { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId tableId = TableId.tableId("ns", "scm", "tbl"); + List events = generateNumericCastingEvents(tableId); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList( + new TransformDef( + "ns.scm.tbl", + expression, + null, + null, + null, + null, + null)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + return outputEvents; + } finally { + outCaptor.reset(); + } + } + + // Generate a projection expression like CAST(tiny AS ) AS tiny, ... + private static String generateCastTo(String type) { + return "id, " + + Stream.of("tiny", "small", "int", "bigint", "float", "double", "decimal") + .map(col -> String.format("CAST(%s_c AS %s) AS %s_c", col, type, col)) + .collect(Collectors.joining(", ")); + } + + @Test + void testNumericCastingsWithTruncation() throws Exception { + assertThat(runNumericCastingWith("*")) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` SMALLINT,`int_c` INT,`bigint_c` BIGINT,`float_c` FLOAT,`double_c` DOUBLE,`decimal_c` DECIMAL(10, 2)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6.7, -8.9, -10.11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0.0, 0.0, 0.00], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6.7, 8.9, 10.11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("BOOLEAN"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BOOLEAN,`small_c` BOOLEAN,`int_c` BOOLEAN,`bigint_c` BOOLEAN,`float_c` BOOLEAN,`double_c` BOOLEAN,`decimal_c` BOOLEAN}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, true, true, true, true, true, true, true], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, false, false, false, false, false, false, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, true, true, true, true, true, true, true], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("TINYINT"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` TINYINT,`int_c` TINYINT,`bigint_c` TINYINT,`float_c` TINYINT,`double_c` TINYINT,`decimal_c` TINYINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("SMALLINT"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` SMALLINT,`small_c` SMALLINT,`int_c` SMALLINT,`bigint_c` SMALLINT,`float_c` SMALLINT,`double_c` SMALLINT,`decimal_c` SMALLINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("INT"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` INT,`small_c` INT,`int_c` INT,`bigint_c` INT,`float_c` INT,`double_c` INT,`decimal_c` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("BIGINT"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BIGINT,`small_c` BIGINT,`int_c` BIGINT,`bigint_c` BIGINT,`float_c` BIGINT,`double_c` BIGINT,`decimal_c` BIGINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("FLOAT"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` FLOAT,`small_c` FLOAT,`int_c` FLOAT,`bigint_c` FLOAT,`float_c` FLOAT,`double_c` FLOAT,`decimal_c` FLOAT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("DOUBLE"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DOUBLE,`small_c` DOUBLE,`int_c` DOUBLE,`bigint_c` DOUBLE,`float_c` DOUBLE,`double_c` DOUBLE,`decimal_c` DOUBLE}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.699999809265137, -8.9, -10.11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.699999809265137, 8.9, 10.11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("DECIMAL(1, 0)"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(1, 0),`small_c` DECIMAL(1, 0),`int_c` DECIMAL(1, 0),`bigint_c` DECIMAL(1, 0),`float_c` DECIMAL(1, 0),`double_c` DECIMAL(1, 0),`decimal_c` DECIMAL(1, 0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("DECIMAL(2, 0)"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(2, 0),`small_c` DECIMAL(2, 0),`int_c` DECIMAL(2, 0),`bigint_c` DECIMAL(2, 0),`float_c` DECIMAL(2, 0),`double_c` DECIMAL(2, 0),`decimal_c` DECIMAL(2, 0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, -10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, 10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("DECIMAL(3, 1)"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(3, 1),`small_c` DECIMAL(3, 1),`int_c` DECIMAL(3, 1),`bigint_c` DECIMAL(3, 1),`float_c` DECIMAL(3, 1),`double_c` DECIMAL(3, 1),`decimal_c` DECIMAL(3, 1)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + + assertThat(runNumericCastingWith(generateCastTo("DECIMAL(19, 10)"))) + .containsExactly( + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(19, 10),`small_c` DECIMAL(19, 10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` DECIMAL(19, 10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 10)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, -6.7000000000, -8.9000000000, -10.1100000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + } + private List generateSchemaEvolutionEvents(TableId tableId) { List events = new ArrayList<>(); @@ -1771,6 +1938,73 @@ private List generateSchemaEvolutionEvents(TableId tableId) { return events; } + private List generateNumericCastingEvents(TableId tableId) { + List events = new ArrayList<>(); + + // Initial schema + { + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("tiny_c", DataTypes.TINYINT()) + .physicalColumn("small_c", DataTypes.SMALLINT()) + .physicalColumn("int_c", DataTypes.INT()) + .physicalColumn("bigint_c", DataTypes.BIGINT()) + .physicalColumn("float_c", DataTypes.FLOAT()) + .physicalColumn("double_c", DataTypes.DOUBLE()) + .physicalColumn("decimal_c", DataTypes.DECIMAL(10, 2)) + .primaryKey("id") + .build(); + + events.add(new CreateTableEvent(tableId, schema)); + events.add( + DataChangeEvent.insertEvent( + tableId, + generate( + schema, + -1L, + (byte) -2, + (short) -3, + -4, + (long) -5, + -6.7f, + -8.9d, + DecimalData.fromBigDecimal(new BigDecimal("-10.11"), 10, 2)))); + events.add( + DataChangeEvent.insertEvent( + tableId, + generate( + schema, + 0L, + (byte) 0, + (short) 0, + 0, + (long) 0, + 0f, + 0d, + DecimalData.fromBigDecimal(BigDecimal.ZERO, 10, 2)))); + events.add( + DataChangeEvent.insertEvent( + tableId, + generate( + schema, + 1L, + (byte) 2, + (short) 3, + 4, + (long) 5, + 6.7f, + 8.9d, + DecimalData.fromBigDecimal(new BigDecimal("10.11"), 10, 2)))); + events.add( + DataChangeEvent.insertEvent( + tableId, + generate(schema, 2L, null, null, null, null, null, null, null))); + } + + return events; + } + void extractDataLines(String line) { if (!line.startsWith("DataChangeEvent{")) { return; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index 649b9b8cce7..09574496165 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -606,54 +606,141 @@ public static String castToString(Object object) { return object.toString(); } - public static Byte castToByte(Object object) { + public static Boolean castToBoolean(Object object) { if (object == null) { return null; + } else if (object instanceof Boolean) { + return (Boolean) object; + } else if (object instanceof Byte) { + return !object.equals((byte) 0); + } else if (object instanceof Short) { + return !object.equals((short) 0); + } else if (object instanceof Integer) { + return !object.equals(0); + } else if (object instanceof Long) { + return !object.equals(0L); + } else if (object instanceof Float) { + return !object.equals(0f); + } else if (object instanceof Double) { + return !object.equals(0d); + } else if (object instanceof BigDecimal) { + return ((BigDecimal) object).compareTo(BigDecimal.ZERO) != 0; } - return Byte.valueOf(castObjectIntoString(object)); + return Boolean.valueOf(castToString(object)); } - public static Boolean castToBoolean(Object object) { + public static Byte castToByte(Object object) { if (object == null) { return null; } - if (object instanceof Byte - || object instanceof Short - || object instanceof Integer - || object instanceof Long - || object instanceof Float - || object instanceof Double - || object instanceof BigDecimal) { - return !object.equals(0); + if (object instanceof Boolean) { + return (byte) ((Boolean) object ? 1 : 0); + } + if (object instanceof BigDecimal) { + return ((BigDecimal) object).byteValue(); + } + if (object instanceof Double) { + return ((Double) object).byteValue(); + } + if (object instanceof Float) { + return ((Float) object).byteValue(); + } + String stringRep = castToString(object); + try { + return Byte.valueOf(stringRep); + } catch (NumberFormatException e) { + return Double.valueOf(stringRep).byteValue(); } - return Boolean.valueOf(castToString(object)); } public static Short castToShort(Object object) { if (object == null) { return null; } - return Short.valueOf(castObjectIntoString(object)); + if (object instanceof Boolean) { + return (short) ((Boolean) object ? 1 : 0); + } + if (object instanceof BigDecimal) { + return ((BigDecimal) object).shortValue(); + } + if (object instanceof Double) { + return ((Double) object).shortValue(); + } + if (object instanceof Float) { + return ((Float) object).shortValue(); + } + String stringRep = castToString(object); + try { + return Short.valueOf(stringRep); + } catch (NumberFormatException e) { + return Double.valueOf(stringRep).shortValue(); + } } public static Integer castToInteger(Object object) { if (object == null) { return null; } - return Integer.valueOf(castObjectIntoString(object)); + if (object instanceof Boolean) { + return (Boolean) object ? 1 : 0; + } + if (object instanceof BigDecimal) { + return ((BigDecimal) object).intValue(); + } + if (object instanceof Double) { + return ((Double) object).intValue(); + } + if (object instanceof Float) { + return ((Float) object).intValue(); + } + String stringRep = castToString(object); + try { + return Integer.valueOf(stringRep); + } catch (NumberFormatException e) { + return Double.valueOf(stringRep).intValue(); + } } public static Long castToLong(Object object) { if (object == null) { return null; } - return Long.valueOf(castObjectIntoString(object)); + if (object instanceof Boolean) { + return (Boolean) object ? 1L : 0L; + } + if (object instanceof BigDecimal) { + return ((BigDecimal) object).longValue(); + } + if (object instanceof Double) { + return ((Double) object).longValue(); + } + if (object instanceof Float) { + return ((Float) object).longValue(); + } + String stringRep = castToString(object); + try { + return Long.valueOf(stringRep); + } catch (NumberFormatException e) { + return Double.valueOf(stringRep).longValue(); + } } public static Float castToFloat(Object object) { if (object == null) { return null; } + if (object instanceof Boolean) { + return (Boolean) object ? 1f : 0f; + } + if (object instanceof BigDecimal) { + return ((BigDecimal) object).floatValue(); + } + if (object instanceof Double) { + return ((Double) object).floatValue(); + } + if (object instanceof Float) { + return (Float) object; + } return Float.valueOf(castObjectIntoString(object)); } @@ -661,6 +748,18 @@ public static Double castToDouble(Object object) { if (object == null) { return null; } + if (object instanceof Boolean) { + return (Boolean) object ? 1d : 0d; + } + if (object instanceof BigDecimal) { + return ((BigDecimal) object).doubleValue(); + } + if (object instanceof Double) { + return (Double) object; + } + if (object instanceof Float) { + return ((Float) object).doubleValue(); + } return Double.valueOf(castObjectIntoString(object)); } @@ -668,9 +767,15 @@ public static BigDecimal castToBigDecimal(Object object, int precision, int scal if (object == null) { return null; } + if (object instanceof Boolean) { + object = (Boolean) object ? 1 : 0; + } BigDecimal bigDecimal = new BigDecimal(castObjectIntoString(object), new MathContext(precision)); - bigDecimal = bigDecimal.setScale(scale, BigDecimal.ROUND_HALF_UP); + bigDecimal = bigDecimal.setScale(scale, RoundingMode.HALF_UP); + if (bigDecimal.precision() > precision) { + return null; + } return bigDecimal; } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index faf50a87966..94b190cf086 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -36,6 +36,7 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import java.math.BigDecimal; +import java.time.format.DateTimeParseException; /** Unit tests for the {@link PostTransformOperator}. */ public class PostTransformOperatorTest { @@ -1479,8 +1480,8 @@ void testCastErrorTransform() throws Exception { transform.processElement(new StreamRecord<>(insertEvent1)); }) .isExactlyInstanceOf(RuntimeException.class) - .hasRootCauseInstanceOf(NumberFormatException.class) - .hasRootCauseMessage("For input string: \"1.0\""); + .hasRootCauseInstanceOf(DateTimeParseException.class) + .hasRootCauseMessage("Text '1.0' could not be parsed at index 0"); } @Test From 0ed20c699721077aeac5773171f4c3f7cb657b65 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 11 Dec 2024 11:02:22 +0800 Subject: [PATCH 2/3] add TODO comments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../apache/flink/cdc/runtime/functions/SystemFunctionUtils.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index 09574496165..1ea2cdb7c26 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -774,6 +774,8 @@ public static BigDecimal castToBigDecimal(Object object, int precision, int scal new BigDecimal(castObjectIntoString(object), new MathContext(precision)); bigDecimal = bigDecimal.setScale(scale, RoundingMode.HALF_UP); if (bigDecimal.precision() > precision) { + // TODO: Handle malformed data properly after dirty data handling is implemented in + // Pipeline Framework. return null; } return bigDecimal; From 67db5ef74c335b5feaa007234be2c4a349312e2d Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:34:19 +0800 Subject: [PATCH 3/3] fix: robust cast functions Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../flink/FlinkPipelineTransformITCase.java | 149 ++++++++++-------- .../functions/SystemFunctionUtils.java | 57 ++++++- 2 files changed, 133 insertions(+), 73 deletions(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index a45b3bcbca9..0c3fb5dca7e 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -1666,7 +1666,16 @@ String[] runNumericCastingWith(String expression) throws Exception { // Generate a projection expression like CAST(tiny AS ) AS tiny, ... private static String generateCastTo(String type) { return "id, " - + Stream.of("tiny", "small", "int", "bigint", "float", "double", "decimal") + + Stream.of( + "tiny", + "small", + "int", + "bigint", + "float", + "double", + "decimal", + "valid_char", + "invalid_char") .map(col -> String.format("CAST(%s_c AS %s) AS %s_c", col, type, col)) .collect(Collectors.joining(", ")); } @@ -1675,99 +1684,99 @@ private static String generateCastTo(String type) { void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith("*")) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` SMALLINT,`int_c` INT,`bigint_c` BIGINT,`float_c` FLOAT,`double_c` DOUBLE,`decimal_c` DECIMAL(10, 2)}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6.7, -8.9, -10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0.0, 0.0, 0.00], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6.7, 8.9, 10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` SMALLINT,`int_c` INT,`bigint_c` BIGINT,`float_c` FLOAT,`double_c` DOUBLE,`decimal_c` DECIMAL(10, 2),`valid_char_c` VARCHAR(17),`invalid_char_c` VARCHAR(17)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6.7, -8.9, -10.11, -12.13, foo], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0.0, 0.0, 0.00, 0, bar], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6.7, 8.9, 10.11, 12.13, baz], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("BOOLEAN"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BOOLEAN,`small_c` BOOLEAN,`int_c` BOOLEAN,`bigint_c` BOOLEAN,`float_c` BOOLEAN,`double_c` BOOLEAN,`decimal_c` BOOLEAN}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, true, true, true, true, true, true, true], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, false, false, false, false, false, false, false], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, true, true, true, true, true, true, true], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BOOLEAN,`small_c` BOOLEAN,`int_c` BOOLEAN,`bigint_c` BOOLEAN,`float_c` BOOLEAN,`double_c` BOOLEAN,`decimal_c` BOOLEAN,`valid_char_c` BOOLEAN,`invalid_char_c` BOOLEAN}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, true, true, true, true, true, true, true, false, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, false, false, false, false, false, false, false, false, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, true, true, true, true, true, true, true, false, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("TINYINT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` TINYINT,`int_c` TINYINT,`bigint_c` TINYINT,`float_c` TINYINT,`double_c` TINYINT,`decimal_c` TINYINT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` TINYINT,`int_c` TINYINT,`bigint_c` TINYINT,`float_c` TINYINT,`double_c` TINYINT,`decimal_c` TINYINT,`valid_char_c` TINYINT,`invalid_char_c` TINYINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("SMALLINT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` SMALLINT,`small_c` SMALLINT,`int_c` SMALLINT,`bigint_c` SMALLINT,`float_c` SMALLINT,`double_c` SMALLINT,`decimal_c` SMALLINT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` SMALLINT,`small_c` SMALLINT,`int_c` SMALLINT,`bigint_c` SMALLINT,`float_c` SMALLINT,`double_c` SMALLINT,`decimal_c` SMALLINT,`valid_char_c` SMALLINT,`invalid_char_c` SMALLINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("INT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` INT,`small_c` INT,`int_c` INT,`bigint_c` INT,`float_c` INT,`double_c` INT,`decimal_c` INT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` INT,`small_c` INT,`int_c` INT,`bigint_c` INT,`float_c` INT,`double_c` INT,`decimal_c` INT,`valid_char_c` INT,`invalid_char_c` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("BIGINT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BIGINT,`small_c` BIGINT,`int_c` BIGINT,`bigint_c` BIGINT,`float_c` BIGINT,`double_c` BIGINT,`decimal_c` BIGINT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BIGINT,`small_c` BIGINT,`int_c` BIGINT,`bigint_c` BIGINT,`float_c` BIGINT,`double_c` BIGINT,`decimal_c` BIGINT,`valid_char_c` BIGINT,`invalid_char_c` BIGINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("FLOAT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` FLOAT,`small_c` FLOAT,`int_c` FLOAT,`bigint_c` FLOAT,`float_c` FLOAT,`double_c` FLOAT,`decimal_c` FLOAT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` FLOAT,`small_c` FLOAT,`int_c` FLOAT,`bigint_c` FLOAT,`float_c` FLOAT,`double_c` FLOAT,`decimal_c` FLOAT,`valid_char_c` FLOAT,`invalid_char_c` FLOAT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.11, -12.13, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.11, 12.13, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("DOUBLE"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DOUBLE,`small_c` DOUBLE,`int_c` DOUBLE,`bigint_c` DOUBLE,`float_c` DOUBLE,`double_c` DOUBLE,`decimal_c` DOUBLE}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.699999809265137, -8.9, -10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.699999809265137, 8.9, 10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DOUBLE,`small_c` DOUBLE,`int_c` DOUBLE,`bigint_c` DOUBLE,`float_c` DOUBLE,`double_c` DOUBLE,`decimal_c` DOUBLE,`valid_char_c` DOUBLE,`invalid_char_c` DOUBLE}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.699999809265137, -8.9, -10.11, -12.13, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.699999809265137, 8.9, 10.11, 12.13, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("DECIMAL(1, 0)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(1, 0),`small_c` DECIMAL(1, 0),`int_c` DECIMAL(1, 0),`bigint_c` DECIMAL(1, 0),`float_c` DECIMAL(1, 0),`double_c` DECIMAL(1, 0),`decimal_c` DECIMAL(1, 0)}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, null], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, null], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(1, 0),`small_c` DECIMAL(1, 0),`int_c` DECIMAL(1, 0),`bigint_c` DECIMAL(1, 0),`float_c` DECIMAL(1, 0),`double_c` DECIMAL(1, 0),`decimal_c` DECIMAL(1, 0),`valid_char_c` DECIMAL(1, 0),`invalid_char_c` DECIMAL(1, 0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("DECIMAL(2, 0)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(2, 0),`small_c` DECIMAL(2, 0),`int_c` DECIMAL(2, 0),`bigint_c` DECIMAL(2, 0),`float_c` DECIMAL(2, 0),`double_c` DECIMAL(2, 0),`decimal_c` DECIMAL(2, 0)}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, -10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(2, 0),`small_c` DECIMAL(2, 0),`int_c` DECIMAL(2, 0),`bigint_c` DECIMAL(2, 0),`float_c` DECIMAL(2, 0),`double_c` DECIMAL(2, 0),`decimal_c` DECIMAL(2, 0),`valid_char_c` DECIMAL(2, 0),`invalid_char_c` DECIMAL(2, 0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("DECIMAL(3, 1)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(3, 1),`small_c` DECIMAL(3, 1),`int_c` DECIMAL(3, 1),`bigint_c` DECIMAL(3, 1),`float_c` DECIMAL(3, 1),`double_c` DECIMAL(3, 1),`decimal_c` DECIMAL(3, 1)}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.1], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.1], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(3, 1),`small_c` DECIMAL(3, 1),`int_c` DECIMAL(3, 1),`bigint_c` DECIMAL(3, 1),`float_c` DECIMAL(3, 1),`double_c` DECIMAL(3, 1),`decimal_c` DECIMAL(3, 1),`valid_char_c` DECIMAL(3, 1),`invalid_char_c` DECIMAL(3, 1)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.1, -12.1, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.1, 12.1, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("DECIMAL(19, 10)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(19, 10),`small_c` DECIMAL(19, 10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` DECIMAL(19, 10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 10)}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, -6.7000000000, -8.9000000000, -10.1100000000], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(19, 10),`small_c` DECIMAL(19, 10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` DECIMAL(19, 10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 10),`valid_char_c` DECIMAL(19, 10),`invalid_char_c` DECIMAL(19, 10)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, -6.7000000000, -8.9000000000, -10.1100000000, -12.1300000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000, 12.1300000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); } private List generateSchemaEvolutionEvents(TableId tableId) { @@ -1953,6 +1962,8 @@ private List generateNumericCastingEvents(TableId tableId) { .physicalColumn("float_c", DataTypes.FLOAT()) .physicalColumn("double_c", DataTypes.DOUBLE()) .physicalColumn("decimal_c", DataTypes.DECIMAL(10, 2)) + .physicalColumn("valid_char_c", DataTypes.VARCHAR(17)) + .physicalColumn("invalid_char_c", DataTypes.VARCHAR(17)) .primaryKey("id") .build(); @@ -1969,7 +1980,9 @@ private List generateNumericCastingEvents(TableId tableId) { (long) -5, -6.7f, -8.9d, - DecimalData.fromBigDecimal(new BigDecimal("-10.11"), 10, 2)))); + DecimalData.fromBigDecimal(new BigDecimal("-10.11"), 10, 2), + "-12.13", + "foo"))); events.add( DataChangeEvent.insertEvent( tableId, @@ -1982,7 +1995,9 @@ private List generateNumericCastingEvents(TableId tableId) { (long) 0, 0f, 0d, - DecimalData.fromBigDecimal(BigDecimal.ZERO, 10, 2)))); + DecimalData.fromBigDecimal(BigDecimal.ZERO, 10, 2), + "0", + "bar"))); events.add( DataChangeEvent.insertEvent( tableId, @@ -1995,11 +2010,15 @@ private List generateNumericCastingEvents(TableId tableId) { (long) 5, 6.7f, 8.9d, - DecimalData.fromBigDecimal(new BigDecimal("10.11"), 10, 2)))); + DecimalData.fromBigDecimal(new BigDecimal("10.11"), 10, 2), + "12.13", + "baz"))); events.add( DataChangeEvent.insertEvent( tableId, - generate(schema, 2L, null, null, null, null, null, null, null))); + generate( + schema, 2L, null, null, null, null, null, null, null, null, + null))); } return events; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index 1ea2cdb7c26..6f6d52a3b9d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -44,7 +44,10 @@ import static org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToDate; import static org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToTime; -/** System function utils to support the call of flink cdc pipeline transform. */ +/** + * System function utils to support the call of flink cdc pipeline transform.
+ * {@code castToXxx}-series function returns `null` when conversion is not viable. + */ public class SystemFunctionUtils { private static final Logger LOG = LoggerFactory.getLogger(SystemFunctionUtils.class); @@ -649,7 +652,13 @@ public static Byte castToByte(Object object) { try { return Byte.valueOf(stringRep); } catch (NumberFormatException e) { + // Ignore this exception because it could still represent a valid floating point number, + // but could not be accepted by Byte#valueOf. + } + try { return Double.valueOf(stringRep).byteValue(); + } catch (NumberFormatException ignored) { + return null; } } @@ -673,7 +682,13 @@ public static Short castToShort(Object object) { try { return Short.valueOf(stringRep); } catch (NumberFormatException e) { + // Ignore this exception because it could still represent a valid floating point number, + // but could not be accepted by Short#valueOf. + } + try { return Double.valueOf(stringRep).shortValue(); + } catch (NumberFormatException ignored) { + return null; } } @@ -697,7 +712,13 @@ public static Integer castToInteger(Object object) { try { return Integer.valueOf(stringRep); } catch (NumberFormatException e) { + // Ignore this exception because it could still represent a valid floating point number, + // but could not be accepted by Integer#valueOf. + } + try { return Double.valueOf(stringRep).intValue(); + } catch (NumberFormatException ignored) { + return null; } } @@ -721,7 +742,13 @@ public static Long castToLong(Object object) { try { return Long.valueOf(stringRep); } catch (NumberFormatException e) { + // Ignore this exception because it could still represent a valid floating point number, + // but could not be accepted by Long#valueOf. + } + try { return Double.valueOf(stringRep).longValue(); + } catch (NumberFormatException ignored) { + return null; } } @@ -741,7 +768,11 @@ public static Float castToFloat(Object object) { if (object instanceof Float) { return (Float) object; } - return Float.valueOf(castObjectIntoString(object)); + try { + return Float.valueOf(castObjectIntoString(object)); + } catch (NumberFormatException ignored) { + return null; + } } public static Double castToDouble(Object object) { @@ -760,7 +791,11 @@ public static Double castToDouble(Object object) { if (object instanceof Float) { return ((Float) object).doubleValue(); } - return Double.valueOf(castObjectIntoString(object)); + try { + return Double.valueOf(castObjectIntoString(object)); + } catch (NumberFormatException ignored) { + return null; + } } public static BigDecimal castToBigDecimal(Object object, int precision, int scale) { @@ -770,12 +805,18 @@ public static BigDecimal castToBigDecimal(Object object, int precision, int scal if (object instanceof Boolean) { object = (Boolean) object ? 1 : 0; } - BigDecimal bigDecimal = - new BigDecimal(castObjectIntoString(object), new MathContext(precision)); - bigDecimal = bigDecimal.setScale(scale, RoundingMode.HALF_UP); + + BigDecimal bigDecimal; + try { + bigDecimal = new BigDecimal(castObjectIntoString(object), new MathContext(precision)); + bigDecimal = bigDecimal.setScale(scale, RoundingMode.HALF_UP); + } catch (NumberFormatException ignored) { + return null; + } + + // If the precision overflows, null will be returned. Otherwise, we may accidentally emit a + // non-serializable object into the pipeline that breaks downstream. if (bigDecimal.precision() > precision) { - // TODO: Handle malformed data properly after dirty data handling is implemented in - // Pipeline Framework. return null; } return bigDecimal;