From db7278958c283c255172d7da35edf1f8f4b1d273 Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Wed, 7 Aug 2024 18:29:36 +0800 Subject: [PATCH 1/3] Transform rule support referencing one column more than once --- .../transform/ProjectionColumnProcessor.java | 8 ++- .../transform/TransformFilterProcessor.java | 6 +- .../transform/TransformDataOperatorTest.java | 62 +++++++++++++++++++ 3 files changed, 72 insertions(+), 4 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 6c5202342d8..9118d60ba05 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -30,6 +30,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; /** @@ -77,7 +78,9 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) { List params = new ArrayList<>(); List columns = tableInfo.getSchema().getColumns(); RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters(); - for (String originalColumnName : projectionColumn.getOriginalColumnNames()) { + LinkedHashSet originalColumnNames = + new LinkedHashSet<>(projectionColumn.getOriginalColumnNames()); + for (String originalColumnName : originalColumnNames) { if (originalColumnName.equals(TransformParser.DEFAULT_NAMESPACE_NAME)) { params.add(tableInfo.getNamespace()); continue; @@ -110,7 +113,8 @@ private TransformExpressionKey generateTransformExpressionKey() { List> paramTypes = new ArrayList<>(); List columns = tableInfo.getSchema().getColumns(); String scriptExpression = projectionColumn.getScriptExpression(); - List originalColumnNames = projectionColumn.getOriginalColumnNames(); + LinkedHashSet originalColumnNames = + new LinkedHashSet<>(projectionColumn.getOriginalColumnNames()); for (String originalColumnName : originalColumnNames) { for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 7eaefc3c84b..b2f78a5be71 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -30,6 +30,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; /** The processor of the transform filter. It processes the data change event of matched table. */ @@ -72,7 +73,8 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) { List params = new ArrayList<>(); List columns = tableInfo.getSchema().getColumns(); RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters(); - for (String columnName : transformFilter.getColumnNames()) { + LinkedHashSet columnNames = new LinkedHashSet<>(transformFilter.getColumnNames()); + for (String columnName : columnNames) { if (columnName.equals(TransformParser.DEFAULT_NAMESPACE_NAME)) { params.add(tableInfo.getNamespace()); continue; @@ -105,7 +107,7 @@ private TransformExpressionKey generateTransformExpressionKey() { List> paramTypes = new ArrayList<>(); List columns = tableInfo.getSchema().getColumns(); String scriptExpression = transformFilter.getScriptExpression(); - List columnNames = transformFilter.getColumnNames(); + LinkedHashSet columnNames = new LinkedHashSet<>(transformFilter.getColumnNames()); for (String columnName : columnNames) { for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java index a19722ef156..5f133f172d0 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java @@ -170,6 +170,16 @@ public class TransformDataOperatorTest { .primaryKey("col1") .build(); + private static final TableId COLUMN_SQUARE_TABLE = + TableId.tableId("my_company", "my_branch", "column_square"); + private static final Schema COLUMN_SQUARE_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.INT()) + .physicalColumn("col2", DataTypes.INT()) + .physicalColumn("square_col2", DataTypes.INT()) + .primaryKey("col1") + .build(); + @Test void testDataChangeEventTransform() throws Exception { TransformDataOperator transform = @@ -504,6 +514,58 @@ void testMetadataASTransform() throws Exception { .isEqualTo(new StreamRecord<>(insertEventExpect)); } + @Test + void testDataChangeEventTransformWithDuplicateColumns() throws Exception { + TransformDataOperator transform = + TransformDataOperator.newBuilder() + .addTransform( + COLUMN_SQUARE_TABLE.identifier(), + "col1, col2, col2 * col2 as square_col2", + "col2 < 3 OR col2 > 5") + .build(); + EventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) COLUMN_SQUARE_SCHEMA.toRowDataType())); + // Insert + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {1, 1, null})); + DataChangeEvent insertEventExpect = + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new Object[] {1, 1, 1})); + + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {6, 6, null})); + DataChangeEvent insertEventExpect2 = + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new Object[] {6, 6, 36})); + + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent(COLUMN_SQUARE_TABLE, COLUMN_SQUARE_SCHEMA))); + transform.processElement(new StreamRecord<>(insertEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect)); + transform.processElement(new StreamRecord<>(insertEvent2)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect2)); + } + @Test void testTimestampTransform() throws Exception { TransformDataOperator transform = From 9ea3553083c50be339d66a29f9f4611a3e352a8e Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Wed, 7 Aug 2024 19:59:30 +0800 Subject: [PATCH 2/3] fixed comment --- .../operators/transform/TransformDataOperatorTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java index 5f133f172d0..7841f316438 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java @@ -550,6 +550,11 @@ void testDataChangeEventTransformWithDuplicateColumns() throws Exception { DataChangeEvent.insertEvent( COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new Object[] {6, 6, 36})); + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + COLUMN_SQUARE_TABLE, + recordDataGenerator.generate(new Object[] {4, 4, null})); + transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat( transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) @@ -564,6 +569,10 @@ void testDataChangeEventTransformWithDuplicateColumns() throws Exception { Assertions.assertThat( transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) .isEqualTo(new StreamRecord<>(insertEventExpect2)); + transform.processElement(new StreamRecord<>(insertEvent3)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isNull(); } @Test From 2fbebd8a7853636a4208466c358085b8bf4fd74d Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Fri, 9 Aug 2024 17:15:15 +0800 Subject: [PATCH 3/3] Resolve conflicts --- .../operators/transform/TransformFilterProcessor.java | 6 ++---- .../operators/transform/PostTransformOperatorTest.java | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 04dbf0ed79e..6f36edc66ff 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -83,10 +83,8 @@ private Tuple2, List>> generateArguments() { for (String columnName : columnNames) { for (Column column : columns) { if (column.getName().equals(columnName)) { - if (!argNames.contains(columnName)) { - argNames.add(columnName); - argTypes.add(DataTypeConverter.convertOriginalClass(column.getType())); - } + argNames.add(columnName); + argTypes.add(DataTypeConverter.convertOriginalClass(column.getType())); break; } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index ef229060fc7..067842c313a 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -572,14 +572,14 @@ void testMetadataASTransform() throws Exception { @Test void testDataChangeEventTransformWithDuplicateColumns() throws Exception { - TransformDataOperator transform = - TransformDataOperator.newBuilder() + PostTransformOperator transform = + PostTransformOperator.newBuilder() .addTransform( COLUMN_SQUARE_TABLE.identifier(), "col1, col2, col2 * col2 as square_col2", "col2 < 3 OR col2 > 5") .build(); - EventOperatorTestHarness + EventOperatorTestHarness transformFunctionEventEventOperatorTestHarness = new EventOperatorTestHarness<>(transform, 1); // Initialization