From b12b7c7021116c84fb3f003ff67cfb3e07a04692 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Thu, 9 Jan 2025 15:27:00 +0800 Subject: [PATCH 1/2] [hotfix] fix ci failure. --- .../flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 8c54837e8af..34dddb93fe1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -276,7 +276,8 @@ public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) // Each commit will generate one sequence number(equal to checkpointId). List expected = enableDeleteVector - ? Collections.singletonList(Row.ofKind(RowKind.INSERT, 3L)) + ? Arrays.asList( + Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 3L)) : Arrays.asList( Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 2L), From cb8acd2b7991569ff43da9ef055b4f2a9ba62cf2 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Thu, 9 Jan 2025 16:59:08 +0800 Subject: [PATCH 2/2] [hotfix] fix ci failure. --- .../cdc/composer/flink/FlinkPipelineComposerITCase.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 6d77bcfcd31..55a205d495a 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -1241,7 +1241,7 @@ void testTransformMergingWithRouteChangeOrder(ValuesDataSink.SinkApi sinkApi) th assertThat(mergedTableSchema) .isEqualTo( Schema.newBuilder() - .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("id", DataTypes.BIGINT().notNull()) .physicalColumn("name", DataTypes.STRING()) .physicalColumn("age", DataTypes.INT()) .physicalColumn("last_name", DataTypes.STRING()) @@ -1252,9 +1252,9 @@ void testTransformMergingWithRouteChangeOrder(ValuesDataSink.SinkApi sinkApi) th String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}", "AddColumnEvent{tableId=default_namespace.default_schema.merged, addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, existedColumnName=last_name}]}", - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name, null], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name, null], after=[2, Bob, 30, last_name, null], op=UPDATE, meta=()}",