From 17e5cf93035b53af4eb2b612f1f57fde6e16902d Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Sun, 28 Apr 2024 14:44:41 +0800 Subject: [PATCH] [FLINK-35256][runtime] Fix transform node does not respect type nullability --- .../cdc/runtime/parser/TransformParser.java | 21 ++++++-- .../TransformSchemaOperatorTest.java | 49 +++++++++++++++++++ 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index d89e0e359c6..e571cd66abe 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.io.ParseException; import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn; @@ -154,6 +155,12 @@ public static List generateProjectionColumns( .collect( Collectors.toMap( RelDataTypeField::getName, RelDataTypeField::getType)); + + Map isNotNullMap = + columns.stream() + .collect( + Collectors.toMap( + Column::getName, column -> !column.getType().isNullable())); List projectionColumns = new ArrayList<>(); for (SqlNode sqlNode : sqlSelect.getSelectList()) { if (sqlNode instanceof SqlBasicCall) { @@ -205,21 +212,27 @@ public static List generateProjectionColumns( } else if (sqlNode instanceof SqlIdentifier) { SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode; String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1); + DataType columnType = + DataTypeConverter.convertCalciteRelDataTypeToDataType( + relDataTypeMap.get(columnName)); if (isMetadataColumn(columnName)) { projectionColumns.add( ProjectionColumn.of( columnName, - DataTypeConverter.convertCalciteRelDataTypeToDataType( - relDataTypeMap.get(columnName)), + // Metadata columns should never be null + columnType.notNull(), columnName, columnName, Arrays.asList(columnName))); } else { + // Calcite translated column type doesn't keep nullability. + // Appending it manually to circumvent this problem. projectionColumns.add( ProjectionColumn.of( columnName, - DataTypeConverter.convertCalciteRelDataTypeToDataType( - relDataTypeMap.get(columnName)))); + isNotNullMap.get(columnName) + ? columnType.notNull() + : columnType.nullable())); } } else { throw new ParseException("Unrecognized projection: " + sqlNode.toString()); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperatorTest.java index 9e273024bde..82992a64e6b 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperatorTest.java @@ -74,6 +74,26 @@ public class TransformSchemaOperatorTest { .options(ImmutableMap.of("key1", "value1", "key2", "value2")) .build(); + private static final Schema NULLABILITY_SCHEMA = + Schema.newBuilder() + .physicalColumn("id", DataTypes.STRING().notNull()) + .physicalColumn("name", DataTypes.STRING()) + .primaryKey("id") + .partitionKey("id") + .options(ImmutableMap.of("key1", "value1", "key2", "value2")) + .build(); + + private static final Schema EXPECTED_NULLABILITY_SCHEMA = + Schema.newBuilder() + .physicalColumn("id", DataTypes.STRING().notNull()) + .physicalColumn("uid", DataTypes.STRING()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("uname", DataTypes.STRING()) + .primaryKey("id") + .partitionKey("id") + .options(ImmutableMap.of("key1", "value1", "key2", "value2")) + .build(); + @Test void testEventTransform() throws Exception { TransformSchemaOperator transform = @@ -176,4 +196,33 @@ void testEventTransform() throws Exception { transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) .isEqualTo(new StreamRecord<>(updateEventExpect)); } + + @Test + public void testNullabilityColumn() throws Exception { + TransformSchemaOperator transform = + TransformSchemaOperator.newBuilder() + .addTransform( + CUSTOMERS_TABLEID.identifier(), + "id, upper(id) uid, name, upper(name) uname", + "id", + "id", + "key1=value1,key2=value2") + .build(); + EventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(CUSTOMERS_TABLEID, NULLABILITY_SCHEMA); + transform.processElement(new StreamRecord<>(createTableEvent)); + + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + CUSTOMERS_TABLEID, EXPECTED_NULLABILITY_SCHEMA))); + } }