From 4af4f3f1dbd23db763a4f3c6ea21be2934b80e56 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 30 Oct 2024 15:41:46 +0800 Subject: [PATCH 01/12] [FLINK-36193] Supports applying TRUNCATE / DROP table ddl to Paimon, StarRocks, and Doris Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> # Conflicts: # flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java # flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java # Conflicts: # flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java --- .../doris/sink/DorisMetadataApplier.java | 43 +- .../doris/sink/DorisSchemaChangeManager.java | 47 ++ .../sink/DorisMetadataApplierITCase.java | 91 +++- .../paimon/sink/PaimonMetadataApplier.java | 56 ++- .../paimon/sink/v2/PaimonSinkITCase.java | 449 ++++++++---------- .../starrocks/sink/StarRocksDataSink.java | 5 +- .../sink/StarRocksEnrichedCatalog.java | 107 +++++ .../sink/StarRocksMetadataApplier.java | 37 +- .../starrocks/sink/MockStarRocksCatalog.java | 2 +- .../sink/StarRocksMetadataApplierITCase.java | 85 +++- .../pipeline/tests/MySqlToDorisE2eITCase.java | 253 +++++++++- 11 files changed, 875 insertions(+), 300 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java index cad3b37fc1c..ccf6d879895 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -22,13 +22,14 @@ import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; -import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -47,7 +48,6 @@ import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisOptions; -import org.apache.doris.flink.sink.schema.SchemaChangeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,20 +61,22 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX; /** Supports {@link DorisDataSink} to schema evolution. */ public class DorisMetadataApplier implements MetadataApplier { private static final Logger LOG = LoggerFactory.getLogger(DorisMetadataApplier.class); private DorisOptions dorisOptions; - private SchemaChangeManager schemaChangeManager; + private DorisSchemaChangeManager schemaChangeManager; private Configuration config; private Set enabledSchemaEvolutionTypes; public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) { this.dorisOptions = dorisOptions; - this.schemaChangeManager = new SchemaChangeManager(dorisOptions); + this.schemaChangeManager = new DorisSchemaChangeManager(dorisOptions); this.config = config; this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); } @@ -93,7 +95,13 @@ public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEven @Override public Set getSupportedSchemaEvolutionTypes() { - return Sets.newHashSet(ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN); + return Sets.newHashSet( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + DROP_COLUMN, + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE); } @Override @@ -117,14 +125,16 @@ public void applySchemaChange(SchemaChangeEvent event) { return null; }, dropTableEvent -> { - throw new UnsupportedSchemaChangeEventException(event); + applyDropTableEvent(dropTableEvent); + return null; }, renameColumnEvent -> { applyRenameColumnEvent(renameColumnEvent); return null; }, truncateTableEvent -> { - throw new UnsupportedSchemaChangeEventException(event); + applyTruncateTableEvent(truncateTableEvent); + return null; }); } @@ -275,4 +285,23 @@ private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event) throw new SchemaEvolveException(event, "fail to apply alter column type event", e); } } + + private void applyTruncateTableEvent(TruncateTableEvent truncateTableEvent) + throws SchemaEvolveException { + TableId tableId = truncateTableEvent.tableId(); + try { + schemaChangeManager.truncateTable(tableId.getSchemaName(), tableId.getTableName()); + } catch (Exception e) { + throw new SchemaEvolveException(truncateTableEvent, "fail to truncate table", e); + } + } + + private void applyDropTableEvent(DropTableEvent dropTableEvent) throws SchemaEvolveException { + TableId tableId = dropTableEvent.tableId(); + try { + schemaChangeManager.dropTable(tableId.getSchemaName(), tableId.getTableName()); + } catch (Exception e) { + throw new SchemaEvolveException(dropTableEvent, "fail to drop table", e); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java new file mode 100644 index 00000000000..d3b7ca5a07a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; + +import java.io.IOException; + +import static org.apache.doris.flink.catalog.doris.DorisSystem.identifier; + +/** An enriched version of Doris' {@link SchemaChangeManager}. */ +public class DorisSchemaChangeManager extends SchemaChangeManager { + public DorisSchemaChangeManager(DorisOptions dorisOptions) { + super(dorisOptions); + } + + public boolean truncateTable(String databaseName, String tableName) + throws IOException, IllegalArgumentException { + String createTableDDL = + "TRUNCATE TABLE " + identifier(databaseName) + "." + identifier(tableName); + return this.execute(createTableDDL, databaseName); + } + + public boolean dropTable(String databaseName, String tableName) + throws IOException, IllegalArgumentException { + String createTableDDL = + "DROP TABLE " + identifier(databaseName) + "." + identifier(tableName); + return this.execute(createTableDDL, databaseName); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index 8aa0c542243..7aa0b400c38 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -20,18 +20,24 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.PhysicalColumn; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; @@ -39,6 +45,7 @@ import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -47,10 +54,12 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import java.sql.SQLSyntaxErrorException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -429,8 +438,76 @@ public void testDorisNarrowingAlterColumnType() throws Exception { runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId)); } + @Test + public void testDorisTruncateTable() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + List truncateTableTestingEvents = + Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")), + DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob")), + new TruncateTableEvent(tableId), + DataChangeEvent.insertEvent(tableId, generate(schema, 3, 4.5, "Cecily")), + DataChangeEvent.insertEvent(tableId, generate(schema, 4, 5.6, "Derrida"))); + runJobWithEvents(truncateTableTestingEvents); + + assertEqualsInAnyOrder( + Arrays.asList("3 | 4.5 | Cecily", "4 | 5.6 | Derrida"), + fetchTableContent(tableId, 3)); + } + + @Test + public void testDorisDropTable() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + List preparationTestingEvents = + Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")), + DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob"))); + runJobWithEvents(preparationTestingEvents); + + assertEqualsInAnyOrder( + Arrays.asList("1 | 2.3 | Alice", "2 | 3.4 | Bob"), fetchTableContent(tableId, 3)); + + runJobWithEvents( + Arrays.asList(new CreateTableEvent(tableId, schema), new DropTableEvent(tableId))); + + SQLSyntaxErrorException thrown = + Assertions.assertThrows( + SQLSyntaxErrorException.class, () -> fetchTableContent(tableId, 3)); + Assertions.assertTrue( + thrown.getMessage() + .contains( + String.format( + "errCode = 2, detailMessage = Unknown table '%s'", + tableId.getTableName()))); + } + private void runJobWithEvents(List events) throws Exception { - DataStream stream = env.fromCollection(events, TypeInformation.of(Event.class)); + DataStream stream = + env.fromCollection(events, TypeInformation.of(Event.class)).setParallelism(1); Configuration config = new Configuration() @@ -477,4 +554,16 @@ private void runJobWithEvents(List events) throws Exception { env.execute("Doris Schema Evolution Test"); } + + BinaryRecordData generate(Schema schema, Object... fields) { + return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) + .generate( + Arrays.stream(fields) + .map( + e -> + (e instanceof String) + ? BinaryStringData.fromString((String) e) + : e) + .toArray()); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 928e7b6dfcb..f8c38cd35f7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -21,13 +21,14 @@ import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; -import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.common.types.utils.DataTypeUtils; @@ -41,6 +42,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,14 +139,16 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) return null; }, dropTableEvent -> { - throw new UnsupportedSchemaChangeEventException(dropTableEvent); + applyDropTable(dropTableEvent); + return null; }, renameColumnEvent -> { applyRenameColumn(renameColumnEvent); return null; }, truncateTableEvent -> { - throw new UnsupportedSchemaChangeEventException(truncateTableEvent); + applyTruncateTable(truncateTableEvent); + return null; }); } @@ -180,10 +184,7 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti .primaryKey(primaryKeys) .options(tableOptions) .options(schema.options()); - catalog.createTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - builder.build(), - true); + catalog.createTable(tableIdToIdentifier(event), builder.build(), true); } catch (Catalog.TableAlreadyExistException | Catalog.DatabaseNotExistException | Catalog.DatabaseAlreadyExistException e) { @@ -194,10 +195,7 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti private void applyAddColumn(AddColumnEvent event) throws SchemaEvolveException { try { List tableChangeList = applyAddColumnEventWithPosition(event); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { @@ -286,10 +284,7 @@ private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException List tableChangeList = new ArrayList<>(); event.getDroppedColumnNames() .forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column))); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { @@ -305,10 +300,7 @@ private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveExcep (oldName, newName) -> tableChangeList.add( SchemaChangeProvider.rename(oldName, newName))); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { @@ -325,14 +317,32 @@ private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolv tableChangeList.add( SchemaChangeProvider.updateColumnType( oldName, newType))); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { throw new SchemaEvolveException(event, e.getMessage(), e); } } + + private void applyTruncateTable(TruncateTableEvent event) throws SchemaEvolveException { + try (BatchTableCommit batchTableCommit = + catalog.getTable(tableIdToIdentifier(event)).newBatchWriteBuilder().newCommit()) { + batchTableCommit.truncateTable(); + } catch (Exception e) { + throw new SchemaEvolveException(event, "Failed to apply truncate table event", e); + } + } + + private void applyDropTable(DropTableEvent event) throws SchemaEvolveException { + try { + catalog.dropTable(tableIdToIdentifier(event), true); + } catch (Catalog.TableNotExistException e) { + throw new SchemaEvolveException(event, "Failed to apply drop table event", e); + } + } + + private static Identifier tableIdToIdentifier(SchemaChangeEvent event) { + return new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 34dddb93fe1..44b3aa70bd6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -26,17 +26,21 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; @@ -51,11 +55,12 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.UserCodeClassLoader; +import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.sink.MultiTableCommittable; import org.apache.paimon.options.Options; -import org.junit.jupiter.api.Assertions; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -75,6 +80,8 @@ import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.flink.cdc.common.types.DataTypes.STRING; + /** An ITCase for {@link PaimonWriter} and {@link PaimonCommitter}. */ public class PaimonSinkITCase { @@ -86,9 +93,8 @@ public class PaimonSinkITCase { private String warehouse; - private TableId table1; - - private BinaryRecordDataGenerator generator; + private final TableId table1 = TableId.tableId("test", "table1"); + private final TableId table2 = TableId.tableId("test", "table2"); private static int checkpointId = 1; @@ -115,7 +121,6 @@ private void initialize(String metastore) catalogOptions.setString("metastore", metastore); catalogOptions.setString("warehouse", warehouse); catalogOptions.setString("cache-enabled", "false"); - table1 = TableId.tableId("test", "table1"); if ("hive".equals(metastore)) { catalogOptions.setString("hadoop-conf-dir", HADOOP_CONF_DIR); catalogOptions.setString("hive-conf-dir", HIVE_CONF_DIR); @@ -145,8 +150,8 @@ private List createTestEvents(boolean enableDeleteVectors) throws SchemaE // create table Schema schema = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) - .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col1", STRING()) + .physicalColumn("col2", STRING()) .primaryKey("col1") .option("bucket", "1") .option("deletion-vectors.enabled", String.valueOf(enableDeleteVectors)) @@ -156,27 +161,13 @@ private List createTestEvents(boolean enableDeleteVectors) throws SchemaE PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); metadataApplier.applySchemaChange(createTableEvent); - generator = - new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING())); // insert - DataChangeEvent insertEvent1 = - DataChangeEvent.insertEvent( - table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("1"), - BinaryStringData.fromString("1") - })); - testEvents.add(insertEvent1); - DataChangeEvent insertEvent2 = - DataChangeEvent.insertEvent( - table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("2"), - BinaryStringData.fromString("2") - })); - testEvents.add(insertEvent2); + testEvents.add( + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "1"), Tuple2.of(STRING(), "1")))); + testEvents.add( + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "2"), Tuple2.of(STRING(), "2")))); return testEvents; } @@ -193,96 +184,49 @@ public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) Committer committer = paimonSink.createCommitter(); // insert - for (Event event : createTestEvents(enableDeleteVector)) { - writer.write(event, null); - } - writer.flush(false); - Collection> commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - List result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")), - result); + writeAndCommit(writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0])); + Assertions.assertThat(fetchResults(table1)) + .isEqualTo( + Arrays.asList( + Row.ofKind(RowKind.INSERT, "1", "1"), + Row.ofKind(RowKind.INSERT, "2", "2"))); // delete - Event event = - DataChangeEvent.deleteEvent( - table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("1"), - BinaryStringData.fromString("1") - })); - writer.write(event, null); - writer.flush(false); - commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "2")), result); + writeAndCommit( + writer, + committer, + generateDelete( + table1, Arrays.asList(Tuple2.of(STRING(), "1"), Tuple2.of(STRING(), "1")))); + + Assertions.assertThat(fetchResults(table1)) + .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "2"))); // update - event = - DataChangeEvent.updateEvent( + writeAndCommit( + writer, + committer, + generateUpdate( table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("2"), - BinaryStringData.fromString("2") - }), - generator.generate( - new Object[] { - BinaryStringData.fromString("2"), - BinaryStringData.fromString("x") - })); - writer.write(event, null); - writer.flush(false); - commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "x")), result); + Arrays.asList(Tuple2.of(STRING(), "2"), Tuple2.of(STRING(), "2")), + Arrays.asList(Tuple2.of(STRING(), "2"), Tuple2.of(STRING(), "x")))); + Assertions.assertThat(fetchResults(table1)) + .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "x"))); - result = new ArrayList<>(); - tEnv.sqlQuery("select max_sequence_number from paimon_catalog.test.`table1$files`") - .execute() - .collect() - .forEachRemaining(result::add); - // Each commit will generate one sequence number(equal to checkpointId). - List expected = - enableDeleteVector - ? Arrays.asList( - Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 3L)) - : Arrays.asList( - Row.ofKind(RowKind.INSERT, 1L), - Row.ofKind(RowKind.INSERT, 2L), - Row.ofKind(RowKind.INSERT, 3L)); - Assertions.assertEquals(expected, result); + if (enableDeleteVector) { + Assertions.assertThat(fetchResults( + TableId.tableId("test", "`table1$files`") + )).containsExactly( + Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 3L) + ); + } else { + Assertions.assertThat(fetchResults( + TableId.tableId("test", "`table1$files`") + )).containsExactly( + Row.ofKind(RowKind.INSERT, 1L), + Row.ofKind(RowKind.INSERT, 2L), + Row.ofKind(RowKind.INSERT, 3L) + ); + } } @ParameterizedTest @@ -292,142 +236,102 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto Catalog.DatabaseNotExistException, SchemaEvolveException { initialize(metastore); PaimonSink paimonSink = - new PaimonSink( + new PaimonSink<>( catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); Committer committer = paimonSink.createCommitter(); // 1. receive only DataChangeEvents during one checkpoint - for (Event event : createTestEvents(enableDeleteVector)) { - writer.write(event, null); - } - writer.flush(false); - Collection> commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - List result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")), - result); + writeAndCommit(writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0])); + Assertions.assertThat(fetchResults(table1)) + .isEqualTo( + Arrays.asList( + Row.ofKind(RowKind.INSERT, "1", "1"), + Row.ofKind(RowKind.INSERT, "2", "2"))); // 2. receive DataChangeEvents and SchemaChangeEvents during one checkpoint - DataChangeEvent insertEvent3 = - DataChangeEvent.insertEvent( - table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("3"), - BinaryStringData.fromString("3") - })); - writer.write(insertEvent3, null); - writer.flush(false); + writeAndCommit( + writer, + committer, + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "3"), Tuple2.of(STRING(), "3")))); // add column AddColumnEvent.ColumnWithPosition columnWithPosition = - new AddColumnEvent.ColumnWithPosition( - Column.physicalColumn("col3", DataTypes.STRING())); + new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("col3", STRING())); AddColumnEvent addColumnEvent = new AddColumnEvent(table1, Collections.singletonList(columnWithPosition)); PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); metadataApplier.applySchemaChange(addColumnEvent); - writer.write(addColumnEvent, null); - generator = - new BinaryRecordDataGenerator( - RowType.of(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING())); - DataChangeEvent insertEvent4 = - DataChangeEvent.insertEvent( + + writeAndCommit( + writer, + committer, + generateInsert( table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("4"), - BinaryStringData.fromString("4"), - BinaryStringData.fromString("4") - })); - writer.write(insertEvent4, null); - writer.flush(false); - commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1", null), - Row.ofKind(RowKind.INSERT, "2", "2", null), - Row.ofKind(RowKind.INSERT, "3", "3", null), - Row.ofKind(RowKind.INSERT, "4", "4", "4")), - result); + Arrays.asList( + Tuple2.of(STRING(), "4"), + Tuple2.of(STRING(), "4"), + Tuple2.of(STRING(), "4")))); + + Assertions.assertThat(fetchResults(table1)) + .isEqualTo( + Arrays.asList( + Row.ofKind(RowKind.INSERT, "1", "1", null), + Row.ofKind(RowKind.INSERT, "2", "2", null), + Row.ofKind(RowKind.INSERT, "3", "3", null), + Row.ofKind(RowKind.INSERT, "4", "4", "4"))); // 2. receive DataChangeEvents and SchemaChangeEvents during one checkpoint - DataChangeEvent insertEvent5 = - DataChangeEvent.insertEvent( + writeAndCommit( + writer, + committer, + generateInsert( table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("5"), - BinaryStringData.fromString("5"), - BinaryStringData.fromString("5") - })); - writer.write(insertEvent5, null); - writer.flush(false); + Arrays.asList( + Tuple2.of(STRING(), "5"), + Tuple2.of(STRING(), "5"), + Tuple2.of(STRING(), "5")))); + // drop column DropColumnEvent dropColumnEvent = new DropColumnEvent(table1, Collections.singletonList("col2")); metadataApplier.applySchemaChange(dropColumnEvent); writer.write(dropColumnEvent, null); - generator = - new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING())); - DataChangeEvent insertEvent6 = - DataChangeEvent.insertEvent( - table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("6"), - BinaryStringData.fromString("6") - })); - writer.write(insertEvent6, null); - writer.flush(false); - commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", null), - Row.ofKind(RowKind.INSERT, "2", null), - Row.ofKind(RowKind.INSERT, "3", null), - Row.ofKind(RowKind.INSERT, "4", "4"), - Row.ofKind(RowKind.INSERT, "5", "5"), - Row.ofKind(RowKind.INSERT, "6", "6")), - result); - result = new ArrayList<>(); - tEnv.sqlQuery("select min_sequence_number from paimon_catalog.test.`table1$files`") - .execute() - .collect() - .forEachRemaining(result::add); - Set deduplicated = new HashSet<>(result); - Assertions.assertEquals(result.size(), deduplicated.size()); + + writeAndCommit( + writer, + committer, + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "6"), Tuple2.of(STRING(), "6")))); + + Assertions.assertThat(fetchResults(table1)) + .isEqualTo( + Arrays.asList( + Row.ofKind(RowKind.INSERT, "1", null), + Row.ofKind(RowKind.INSERT, "2", null), + Row.ofKind(RowKind.INSERT, "3", null), + Row.ofKind(RowKind.INSERT, "4", "4"), + Row.ofKind(RowKind.INSERT, "5", "5"), + Row.ofKind(RowKind.INSERT, "6", "6"))); + + TruncateTableEvent truncateTableEvent = new TruncateTableEvent(table1); + metadataApplier.applySchemaChange(truncateTableEvent); + + writeAndCommit( + writer, + committer, + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "7"), Tuple2.of(STRING(), "7")))); + + Assertions.assertThat(fetchResults(table1)) + .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, "7", "7"))); + + DropTableEvent dropTableEvent = new DropTableEvent(table1); + metadataApplier.applySchemaChange(dropTableEvent); + Assertions.assertThatThrownBy(() -> fetchResults(table1)) + .hasRootCauseExactlyInstanceOf(SqlValidatorException.class) + .hasRootCauseMessage("Object 'table1' not found within 'paimon_catalog.test'"); } @ParameterizedTest @@ -443,11 +347,10 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector Committer committer = paimonSink.createCommitter(); List testEvents = createTestEvents(enableDeleteVector); // create table - TableId table2 = TableId.tableId("test", "table2"); Schema schema = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) - .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col1", STRING()) + .physicalColumn("col2", STRING()) .primaryKey("col1") .option("bucket", "1") .build(); @@ -456,43 +359,79 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); metadataApplier.applySchemaChange(createTableEvent); // insert - DataChangeEvent insertEvent1 = - DataChangeEvent.insertEvent( - table2, - generator.generate( - new Object[] { - BinaryStringData.fromString("1"), - BinaryStringData.fromString("1") - })); - testEvents.add(insertEvent1); + testEvents.add( + generateInsert( + table2, Arrays.asList(Tuple2.of(STRING(), "1"), Tuple2.of(STRING(), "1")))); // insert - for (Event event : testEvents) { - writer.write(event, null); - } - writer.flush(false); + writeAndCommit(writer, committer, testEvents.toArray(new Event[0])); + + Assertions.assertThat(fetchResults(table1)) + .isEqualTo( + Arrays.asList( + Row.ofKind(RowKind.INSERT, "1", "1"), + Row.ofKind(RowKind.INSERT, "2", "2"))); + Assertions.assertThat(fetchResults(table2)) + .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1"))); + } + + private static void commit( + PaimonWriter writer, Committer committer) + throws IOException, InterruptedException { Collection> commitRequests = writer.prepareCommit().stream() .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); - List result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")), - result); - result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table2") + } + + private static void writeAndCommit( + PaimonWriter writer, Committer committer, Event... events) + throws IOException, InterruptedException { + for (Event event : events) { + writer.write(event, null); + } + writer.flush(false); + commit(writer, committer); + } + + private List fetchResults(TableId tableId) { + List results = new ArrayList<>(); + tEnv.sqlQuery("select * from paimon_catalog." + tableId.toString()) .execute() .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1")), result); + .forEachRemaining(results::add); + return results; + } + + private BinaryRecordData generate(List> elements) { + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + RowType.of(elements.stream().map(e -> e.f0).toArray(DataType[]::new))); + return generator.generate( + elements.stream() + .map(e -> e.f1) + .map(o -> o instanceof String ? BinaryStringData.fromString((String) o) : o) + .toArray(Object[]::new)); + } + + private DataChangeEvent generateInsert( + TableId tableId, List> elements) { + return DataChangeEvent.insertEvent(tableId, generate(elements)); + } + + private DataChangeEvent generateUpdate( + TableId tableId, + List> beforeElements, + List> afterElements) { + return DataChangeEvent.updateEvent( + tableId, generate(beforeElements), generate(afterElements)); + } + + private DataChangeEvent generateDelete( + TableId tableId, List> elements) { + return DataChangeEvent.deleteEvent(tableId, generate(elements)); } @ParameterizedTest diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java index 9811a010dc2..24bd943556e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java @@ -24,7 +24,6 @@ import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.common.sink.MetadataApplier; -import com.starrocks.connector.flink.catalog.StarRocksCatalog; import com.starrocks.connector.flink.table.sink.SinkFunctionFactory; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.starrocks.connector.flink.table.sink.v2.StarRocksSink; @@ -72,8 +71,8 @@ public EventSinkProvider getEventSinkProvider() { @Override public MetadataApplier getMetadataApplier() { - StarRocksCatalog catalog = - new StarRocksCatalog( + StarRocksEnrichedCatalog catalog = + new StarRocksEnrichedCatalog( sinkOptions.getJdbcUrl(), sinkOptions.getUsername(), sinkOptions.getPassword()); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java new file mode 100644 index 00000000000..24f1f444cf2 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.starrocks.connector.flink.catalog.StarRocksCatalog; +import com.starrocks.connector.flink.catalog.StarRocksCatalogException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** An enriched {@code StarRocksCatalog} with more schema evolution abilities. */ +public class StarRocksEnrichedCatalog extends StarRocksCatalog { + public StarRocksEnrichedCatalog(String jdbcUrl, String username, String password) { + super(jdbcUrl, username, password); + } + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksEnrichedCatalog.class); + + public void truncateTable(String databaseName, String tableName) + throws StarRocksCatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableName), + "Table name cannot be null or empty."); + String alterSql = this.buildTruncateTableSql(databaseName, tableName); + try { + // TRUNCATE TABLE is not regarded as a column-based schema change for StarRocks, so + // there's no need to check the evolution state. + executeUpdateStatement(alterSql); + } catch (Exception e) { + LOG.error( + "Failed to truncate table `{}`.`{}`. SQL executed: {}", + databaseName, + tableName, + alterSql); + throw new StarRocksCatalogException( + String.format("Failed to truncate table `%s`.`%s`.", databaseName, tableName), + e); + } + } + + public void dropTable(String databaseName, String tableName) throws StarRocksCatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableName), + "Table name cannot be null or empty."); + String alterSql = this.buildDropTableSql(databaseName, tableName); + try { + // like TRUNCATE TABLE, DROP TABLE isn't a column-affecting operation and `executeAlter` + // method isn't appropriate. + executeUpdateStatement(alterSql); + } catch (Exception e) { + LOG.error( + "Failed to drop table `{}`.`{}`. SQL executed: {}", + databaseName, + tableName, + alterSql); + throw new StarRocksCatalogException( + String.format("Failed to drop table `%s`.`%s`.", databaseName, tableName), e); + } + } + + private String buildTruncateTableSql(String databaseName, String tableName) { + return String.format("TRUNCATE TABLE `%s`.`%s`;", databaseName, tableName); + } + + private String buildDropTableSql(String databaseName, String tableName) { + return String.format("DROP TABLE `%s`.`%s`;", databaseName, tableName); + } + + private void executeUpdateStatement(String sql) throws StarRocksCatalogException { + try { + Method m = + getClass() + .getSuperclass() + .getDeclaredMethod("executeUpdateStatement", String.class); + m.setAccessible(true); + m.invoke(this, sql); + } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java index 25bb2656c68..4204dbf9c9e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java @@ -21,10 +21,12 @@ import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; @@ -33,7 +35,6 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; -import com.starrocks.connector.flink.catalog.StarRocksCatalog; import com.starrocks.connector.flink.catalog.StarRocksCatalogException; import com.starrocks.connector.flink.catalog.StarRocksColumn; import com.starrocks.connector.flink.catalog.StarRocksTable; @@ -53,14 +54,14 @@ public class StarRocksMetadataApplier implements MetadataApplier { private static final Logger LOG = LoggerFactory.getLogger(StarRocksMetadataApplier.class); - private final StarRocksCatalog catalog; + private final StarRocksEnrichedCatalog catalog; private final TableCreateConfig tableCreateConfig; private final SchemaChangeConfig schemaChangeConfig; private boolean isOpened; private Set enabledSchemaEvolutionTypes; public StarRocksMetadataApplier( - StarRocksCatalog catalog, + StarRocksEnrichedCatalog catalog, TableCreateConfig tableCreateConfig, SchemaChangeConfig schemaChangeConfig) { this.catalog = catalog; @@ -87,7 +88,9 @@ public Set getSupportedSchemaEvolutionTypes() { return Sets.newHashSet( SchemaChangeEventType.CREATE_TABLE, SchemaChangeEventType.ADD_COLUMN, - SchemaChangeEventType.DROP_COLUMN); + SchemaChangeEventType.DROP_COLUMN, + SchemaChangeEventType.DROP_TABLE, + SchemaChangeEventType.TRUNCATE_TABLE); } @Override @@ -117,14 +120,16 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) return null; }, dropTableEvent -> { - throw new UnsupportedSchemaChangeEventException(dropTableEvent); + applyDropTable(dropTableEvent); + return null; }, renameColumnEvent -> { applyRenameColumn(renameColumnEvent); return null; }, truncateTableEvent -> { - throw new UnsupportedSchemaChangeEventException(truncateTableEvent); + applyTruncateTable(truncateTableEvent); + return null; }); } @@ -315,4 +320,24 @@ private void applyAlterColumnType(AlterColumnTypeEvent alterColumnTypeEvent) // the alter after a discussion. throw new UnsupportedSchemaChangeEventException(alterColumnTypeEvent); } + + private void applyTruncateTable(TruncateTableEvent truncateTableEvent) { + try { + catalog.truncateTable( + truncateTableEvent.tableId().getSchemaName(), + truncateTableEvent.tableId().getTableName()); + } catch (StarRocksCatalogException e) { + throw new SchemaEvolveException(truncateTableEvent, e.getMessage(), e); + } + } + + private void applyDropTable(DropTableEvent dropTableEvent) { + try { + catalog.dropTable( + dropTableEvent.tableId().getSchemaName(), + dropTableEvent.tableId().getTableName()); + } catch (StarRocksCatalogException e) { + throw new SchemaEvolveException(dropTableEvent, e.getMessage(), e); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java index 643a8b31a86..c12be85a735 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java @@ -29,7 +29,7 @@ import java.util.Optional; /** Mock {@link StarRocksCatalog} for testing. */ -public class MockStarRocksCatalog extends StarRocksCatalog { +public class MockStarRocksCatalog extends StarRocksEnrichedCatalog { /** database name -> table name -> table. */ private final Map> tables; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java index 338af86d50b..8afd11fe7d0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -20,18 +20,24 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.PhysicalColumn; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; @@ -39,11 +45,14 @@ import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer; import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; @@ -350,8 +359,70 @@ public void testStarRocksNarrowingAlterColumnType() throws Exception { runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId)); } + @Test + public void testStarRocksTruncateTable() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + List truncateTableTestingEvents = + Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")), + DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob")), + new TruncateTableEvent(tableId), + DataChangeEvent.insertEvent(tableId, generate(schema, 3, 4.5, "Cecily")), + DataChangeEvent.insertEvent(tableId, generate(schema, 4, 5.6, "Derrida"))); + runJobWithEvents(truncateTableTestingEvents); + + assertEqualsInAnyOrder( + Arrays.asList("3 | 4.5 | Cecily", "4 | 5.6 | Derrida"), + fetchTableContent(tableId, 3)); + } + + @Test + public void testStarRocksDropTable() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + List dropTableTestingEvents = + Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")), + DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob")), + new DropTableEvent(tableId)); + runJobWithEvents(dropTableTestingEvents); + + Assert.assertThrows( + String.format( + "Getting analyzing error. Detail message: Unknown table '%s.%s'.", + tableId.getSchemaName(), tableId.getTableName()), + MySQLSyntaxErrorException.class, + () -> fetchTableContent(tableId, 3)); + } + private void runJobWithEvents(List events) throws Exception { - DataStream stream = env.fromCollection(events, TypeInformation.of(Event.class)); + DataStream stream = + env.fromCollection(events, TypeInformation.of(Event.class)).setParallelism(1); Configuration config = new Configuration() @@ -392,4 +463,16 @@ private void runJobWithEvents(List events) throws Exception { env.execute("StarRocks Schema Evolution Test"); } + + BinaryRecordData generate(Schema schema, Object... fields) { + return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) + .generate( + Arrays.stream(fields) + .map( + e -> + (e instanceof String) + ? BinaryStringData.fromString((String) e) + : e) + .toArray()); + } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java index 508eebdb704..33b7ce63b82 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java @@ -55,6 +55,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; /** End-to-end tests for mysql cdc to Doris pipeline job. */ @@ -396,6 +397,244 @@ public void testComplexDataTypes() throws Exception { } } + @Test + public void testSchemaEvolution() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: mysql\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: doris\n" + + " fenodes: doris:8030\n" + + " benodes: doris:8040\n" + + " username: %s\n" + + " password: \"%s\"\n" + + " table.create.properties.replication_num: 1\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: evolve\n" + + " parallelism: %d", + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + DORIS.getUsername(), + DORIS.getPassword(), + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null")); + + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "customers", + 4, + Arrays.asList( + "101 | user_1 | Shanghai | 123567891234", + "102 | user_2 | Shanghai | 123567891234", + "103 | user_3 | Shanghai | 123567891234", + "104 | user_4 | Shanghai | 123567891234")); + + LOG.info("Begin incremental reading stage."); + + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110 + + // Ensure we've entered binlog reading stage + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null")); + + // Schema change - Add Column + stat.execute("ALTER TABLE products ADD COLUMN extras INT;"); + Thread.sleep(1000L); + stat.execute( + "INSERT INTO products VALUES (default, 'blt', 'bacon, lettuce and tomato sandwich', 0.2, null, null, null, 17)"); // 111 + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 8, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | null", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0} | null", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0} | null", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0} | null", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0} | null", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null | null", + "111 | blt | bacon, lettuce and tomato sandwich | 0.2 | null | null | null | 17")); + + // Schema change - Rename Column + stat.execute("ALTER TABLE products RENAME COLUMN extras TO extra_col;"); + Thread.sleep(1000L); + stat.execute( + "INSERT INTO products VALUES (default, 'cheeseburger', 'meat patty, cheese slice and onions', 0.1, null, null, null, 18)"); // 112 + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 8, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | null", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0} | null", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0} | null", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0} | null", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0} | null", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null | null", + "111 | blt | bacon, lettuce and tomato sandwich | 0.2 | null | null | null | 17", + "112 | cheeseburger | meat patty, cheese slice and onions | 0.1 | null | null | null | 18")); + + // Schema change - Alter Column Type + stat.execute("ALTER TABLE products MODIFY COLUMN extra_col double;"); + Thread.sleep(1000L); + stat.execute( + "INSERT INTO products VALUES (default, 'fries', 'potato and salt', 0.05, null, null, null, 19)"); // 113 + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 8, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | null", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0} | null", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0} | null", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0} | null", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0} | null", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null | null", + "111 | blt | bacon, lettuce and tomato sandwich | 0.2 | null | null | null | 17.0", + "112 | cheeseburger | meat patty, cheese slice and onions | 0.1 | null | null | null | 18.0", + "113 | fries | potato and salt | 0.05 | null | null | null | 19.0")); + + // Schema change - Drop Column + stat.execute("ALTER TABLE products DROP COLUMN extra_col;"); + Thread.sleep(1000L); + stat.execute( + "INSERT INTO products VALUES (default, 'mac', 'cheese', 0.025, null, null, null)"); // 114 + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null", + "111 | blt | bacon, lettuce and tomato sandwich | 0.2 | null | null | null", + "112 | cheeseburger | meat patty, cheese slice and onions | 0.1 | null | null | null", + "113 | fries | potato and salt | 0.05 | null | null | null", + "114 | mac | cheese | 0.025 | null | null | null")); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("TRUNCATE TABLE products;"); + Thread.sleep(1000L); + stat.execute( + "INSERT INTO products VALUES (default, 'pasta', 'noodles', 0, null, null, null);"); // 1, because truncating resets auto_increment id + } + + validateSinkResult( + mysqlInventoryDatabase.getDatabaseName(), + "products", + 7, + Collections.singletonList("1 | pasta | noodles | 0.0 | null | null | null")); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("DROP TABLE products;"); + } + Thread.sleep(1000L); + + SQLException thrown = + assertThrows( + SQLException.class, + () -> { + try (Connection conn = + DriverManager.getConnection( + DORIS.getJdbcUrl( + mysqlInventoryDatabase + .getDatabaseName(), + DORIS.getUsername())); + Statement stat = conn.createStatement()) { + stat.executeQuery("SELECT * FROM products;"); + } + }); + assertTrue( + thrown.getMessage() + .contains("errCode = 2, detailMessage = Unknown table 'products'")); + } + public static void createDorisDatabase(String databaseName) { try { Container.ExecResult rs = @@ -437,6 +676,16 @@ public static void dropDorisDatabase(String databaseName) { private void validateSinkResult( String databaseName, String tableName, int columnCount, List expected) throws Exception { + validateSqlResults( + databaseName, + String.format("SELECT * FROM `%s`.`%s`;", databaseName, tableName), + columnCount, + expected); + } + + private void validateSqlResults( + String databaseName, String sql, int columnCount, List expected) + throws Exception { long startWaitingTimestamp = System.currentTimeMillis(); while (true) { if (System.currentTimeMillis() - startWaitingTimestamp @@ -448,9 +697,7 @@ private void validateSinkResult( DriverManager.getConnection( DORIS.getJdbcUrl(databaseName, DORIS.getUsername())); Statement stat = conn.createStatement()) { - ResultSet rs = - stat.executeQuery( - String.format("SELECT * FROM `%s`.`%s`;", databaseName, tableName)); + ResultSet rs = stat.executeQuery(sql); while (rs.next()) { List columns = new ArrayList<>(); From 06eb4c35e50b5a2425b4406e0257ff7a3b31f12c Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 30 Oct 2024 19:29:56 +0800 Subject: [PATCH 02/12] fix: two-stage test to make ci happy Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../sink/DorisMetadataApplierITCase.java | 47 ++++++++++++++++--- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index 7aa0b400c38..38d20960d38 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -73,6 +73,7 @@ import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME; +import static org.junit.Assert.fail; /** IT tests for {@link DorisMetadataApplier}. */ @RunWith(Parameterized.class) @@ -452,16 +453,25 @@ public void testDorisTruncateTable() throws Exception { .primaryKey("id") .build(); - List truncateTableTestingEvents = + List preparationTestingEvents = Arrays.asList( new CreateTableEvent(tableId, schema), DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")), - DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob")), + DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob"))); + runJobWithEvents(preparationTestingEvents); + waitAndVerify( + tableId, + 3, + Arrays.asList("1 | 2.3 | Alice", "2 | 3.4 | Bob"), + DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L); + + List truncateTestingEvents = + Arrays.asList( + new CreateTableEvent(tableId, schema), new TruncateTableEvent(tableId), DataChangeEvent.insertEvent(tableId, generate(schema, 3, 4.5, "Cecily")), DataChangeEvent.insertEvent(tableId, generate(schema, 4, 5.6, "Derrida"))); - runJobWithEvents(truncateTableTestingEvents); - + runJobWithEvents(truncateTestingEvents); assertEqualsInAnyOrder( Arrays.asList("3 | 4.5 | Cecily", "4 | 5.6 | Derrida"), fetchTableContent(tableId, 3)); @@ -488,8 +498,11 @@ public void testDorisDropTable() throws Exception { DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob"))); runJobWithEvents(preparationTestingEvents); - assertEqualsInAnyOrder( - Arrays.asList("1 | 2.3 | Alice", "2 | 3.4 | Bob"), fetchTableContent(tableId, 3)); + waitAndVerify( + tableId, + 3, + Arrays.asList("1 | 2.3 | Alice", "2 | 3.4 | Bob"), + DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L); runJobWithEvents( Arrays.asList(new CreateTableEvent(tableId, schema), new DropTableEvent(tableId))); @@ -566,4 +579,26 @@ BinaryRecordData generate(Schema schema, Object... fields) { : e) .toArray()); } + + private void waitAndVerify( + TableId tableId, int numberOfColumns, List expected, long timeoutMilliseconds) + throws Exception { + long timeout = System.currentTimeMillis() + timeoutMilliseconds; + while (System.currentTimeMillis() < timeout) { + List actual = fetchTableContent(tableId, numberOfColumns); + if (expected.stream() + .sorted() + .collect(Collectors.toList()) + .equals(actual.stream().sorted().collect(Collectors.toList()))) { + return; + } + LOG.info( + "Content of {} isn't ready.\nExpected: {}\nActual: {}", + tableId, + expected, + actual); + Thread.sleep(1000L); + } + fail(String.format("Failed to verify content of %s.", tableId)); + } } From 4ce48b42455c766800d7e5875e3135ae33f62b8b Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Sat, 2 Nov 2024 22:12:26 +0800 Subject: [PATCH 03/12] Rename variable name --- .../connectors/doris/sink/DorisSchemaChangeManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java index d3b7ca5a07a..a4636f0457c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java @@ -33,15 +33,15 @@ public DorisSchemaChangeManager(DorisOptions dorisOptions) { public boolean truncateTable(String databaseName, String tableName) throws IOException, IllegalArgumentException { - String createTableDDL = + String truncateTableDDL = "TRUNCATE TABLE " + identifier(databaseName) + "." + identifier(tableName); - return this.execute(createTableDDL, databaseName); + return this.execute(truncateTableDDL, databaseName); } public boolean dropTable(String databaseName, String tableName) throws IOException, IllegalArgumentException { - String createTableDDL = + String dropTableDDL = "DROP TABLE " + identifier(databaseName) + "." + identifier(tableName); - return this.execute(createTableDDL, databaseName); + return this.execute(dropTableDDL, databaseName); } } From d0ae003d347b73ba84ec165a266d2d6b39f5d9b7 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 7 Jan 2025 12:01:20 +0800 Subject: [PATCH 04/12] Resolve conflicts Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../paimon/sink/v2/PaimonSinkITCase.java | 51 +++++++++---------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 44b3aa70bd6..e218207758f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -72,11 +72,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.OptionalLong; -import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -184,7 +182,8 @@ public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) Committer committer = paimonSink.createCommitter(); // insert - writeAndCommit(writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0])); + writeAndCommit( + writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0])); Assertions.assertThat(fetchResults(table1)) .isEqualTo( Arrays.asList( @@ -242,7 +241,8 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto Committer committer = paimonSink.createCommitter(); // 1. receive only DataChangeEvents during one checkpoint - writeAndCommit(writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0])); + writeAndCommit( + writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0])); Assertions.assertThat(fetchResults(table1)) .isEqualTo( Arrays.asList( @@ -380,7 +380,7 @@ private static void commit( throws IOException, InterruptedException { Collection> commitRequests = writer.prepareCommit().stream() - .map(this::correctCheckpointId) + .map(PaimonSinkITCase::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -453,7 +453,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele writer.flush(false); Collection> commitRequests = writer.prepareCommit().stream() - .map(this::correctCheckpointId) + .map(PaimonSinkITCase::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -468,25 +468,25 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele // CommitterOperator will try to re-commit recovered transactions. committer.commit(commitRequests); List events = - Arrays.asList( + Collections.singletonList( DataChangeEvent.insertEvent( table1, - generator.generate( - new Object[] { - BinaryStringData.fromString(Integer.toString(i)), - BinaryStringData.fromString(Integer.toString(i)) - }))); - Assertions.assertDoesNotThrow( - () -> { - for (Event event : events) { - writer.write(event, null); - } - }); + generate( + Arrays.asList( + Tuple2.of(STRING(), String.valueOf(i)), + Tuple2.of(STRING(), String.valueOf(i)))))); + Assertions.assertThatCode( + () -> { + for (Event event : events) { + writer.write(event, null); + } + }) + .doesNotThrowAnyException(); writer.flush(false); // Checkpoint id start from 1 committer.commit( writer.prepareCommit().stream() - .map(this::correctCheckpointId) + .map(PaimonSinkITCase::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList())); } @@ -498,10 +498,10 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele .forEachRemaining(result::add); if (enableDeleteVector) { // Each APPEND will trigger COMPACT once enable deletion-vectors. - Assertions.assertEquals(16, result.size()); + Assertions.assertThat(result).hasSize(16); } else { // 8 APPEND and 1 COMPACT - Assertions.assertEquals(9, result.size()); + Assertions.assertThat(result).hasSize(9); } result.clear(); @@ -509,8 +509,8 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele .execute() .collect() .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( + Assertions.assertThat(result) + .containsExactly( Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2"), Row.ofKind(RowKind.INSERT, "3", "3"), @@ -518,11 +518,10 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele Row.ofKind(RowKind.INSERT, "5", "5"), Row.ofKind(RowKind.INSERT, "6", "6"), Row.ofKind(RowKind.INSERT, "7", "7"), - Row.ofKind(RowKind.INSERT, "8", "8")), - result); + Row.ofKind(RowKind.INSERT, "8", "8")); } - private MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) { + private static MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) { // update the right checkpointId for MultiTableCommittable return new MultiTableCommittable( committable.getDatabase(), From 7a8b16a773ab4614dff0ddd5988cc36d7371a983 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 7 Jan 2025 13:43:12 +0800 Subject: [PATCH 05/12] Resolve conflicts Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../paimon/sink/v2/PaimonSinkITCase.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index e218207758f..c52f0a6a4f9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -72,9 +72,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.OptionalLong; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -263,6 +265,7 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto new AddColumnEvent(table1, Collections.singletonList(columnWithPosition)); PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); metadataApplier.applySchemaChange(addColumnEvent); + writer.write(addColumnEvent, null); writeAndCommit( writer, @@ -305,6 +308,10 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto generateInsert( table1, Arrays.asList(Tuple2.of(STRING(), "6"), Tuple2.of(STRING(), "6")))); + List result = fetchResults(TableId.tableId("test", "`table1$files`")); + Set deduplicated = new HashSet<>(result); + Assertions.assertThat(result).hasSameSizeAs(deduplicated); + Assertions.assertThat(fetchResults(table1)) .isEqualTo( Arrays.asList( @@ -317,7 +324,9 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto TruncateTableEvent truncateTableEvent = new TruncateTableEvent(table1); metadataApplier.applySchemaChange(truncateTableEvent); + Assertions.assertThat(fetchResults(table1)).isEmpty(); + // FIXME: This check will fail when deleteVector is true writeAndCommit( writer, committer, @@ -469,12 +478,11 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele committer.commit(commitRequests); List events = Collections.singletonList( - DataChangeEvent.insertEvent( + generateInsert( table1, - generate( - Arrays.asList( - Tuple2.of(STRING(), String.valueOf(i)), - Tuple2.of(STRING(), String.valueOf(i)))))); + Arrays.asList( + Tuple2.of(STRING(), String.valueOf(i)), + Tuple2.of(STRING(), String.valueOf(i))))); Assertions.assertThatCode( () -> { for (Event event : events) { From bb68781bd5d400062c8a22e0bf93e6e0d4366063 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 9 Jan 2025 12:38:46 +0800 Subject: [PATCH 06/12] minor: fix test case failure when deleteVector = true Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../connectors/paimon/sink/v2/PaimonSinkITCase.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index c52f0a6a4f9..28e3cc5df89 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -326,16 +326,6 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto metadataApplier.applySchemaChange(truncateTableEvent); Assertions.assertThat(fetchResults(table1)).isEmpty(); - // FIXME: This check will fail when deleteVector is true - writeAndCommit( - writer, - committer, - generateInsert( - table1, Arrays.asList(Tuple2.of(STRING(), "7"), Tuple2.of(STRING(), "7")))); - - Assertions.assertThat(fetchResults(table1)) - .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, "7", "7"))); - DropTableEvent dropTableEvent = new DropTableEvent(table1); metadataApplier.applySchemaChange(dropTableEvent); Assertions.assertThatThrownBy(() -> fetchResults(table1)) From 3b94e533087b845b91de600de350b88a96da0cb6 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 10 Jan 2025 10:45:54 +0800 Subject: [PATCH 07/12] Resolve conflicts Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../paimon/sink/v2/PaimonSinkITCase.java | 84 +++++++++---------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 28e3cc5df89..93061c18170 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -187,10 +187,8 @@ public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) writeAndCommit( writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0])); Assertions.assertThat(fetchResults(table1)) - .isEqualTo( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1"), - Row.ofKind(RowKind.INSERT, "2", "2"))); + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")); // delete writeAndCommit( @@ -200,7 +198,7 @@ public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) table1, Arrays.asList(Tuple2.of(STRING(), "1"), Tuple2.of(STRING(), "1")))); Assertions.assertThat(fetchResults(table1)) - .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "2"))); + .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "2", "2")); // update writeAndCommit( @@ -211,22 +209,18 @@ public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) Arrays.asList(Tuple2.of(STRING(), "2"), Tuple2.of(STRING(), "2")), Arrays.asList(Tuple2.of(STRING(), "2"), Tuple2.of(STRING(), "x")))); Assertions.assertThat(fetchResults(table1)) - .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "x"))); + .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "2", "x")); if (enableDeleteVector) { - Assertions.assertThat(fetchResults( - TableId.tableId("test", "`table1$files`") - )).containsExactly( - Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 3L) - ); + Assertions.assertThat(fetchMaxSequenceNumber(table1.getTableName())) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 3L)); } else { - Assertions.assertThat(fetchResults( - TableId.tableId("test", "`table1$files`") - )).containsExactly( - Row.ofKind(RowKind.INSERT, 1L), - Row.ofKind(RowKind.INSERT, 2L), - Row.ofKind(RowKind.INSERT, 3L) - ); + Assertions.assertThat(fetchMaxSequenceNumber(table1.getTableName())) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1L), + Row.ofKind(RowKind.INSERT, 2L), + Row.ofKind(RowKind.INSERT, 3L)); } } @@ -246,10 +240,8 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto writeAndCommit( writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0])); Assertions.assertThat(fetchResults(table1)) - .isEqualTo( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1"), - Row.ofKind(RowKind.INSERT, "2", "2"))); + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")); // 2. receive DataChangeEvents and SchemaChangeEvents during one checkpoint writeAndCommit( @@ -278,12 +270,11 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto Tuple2.of(STRING(), "4")))); Assertions.assertThat(fetchResults(table1)) - .isEqualTo( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1", null), - Row.ofKind(RowKind.INSERT, "2", "2", null), - Row.ofKind(RowKind.INSERT, "3", "3", null), - Row.ofKind(RowKind.INSERT, "4", "4", "4"))); + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1", null), + Row.ofKind(RowKind.INSERT, "2", "2", null), + Row.ofKind(RowKind.INSERT, "3", "3", null), + Row.ofKind(RowKind.INSERT, "4", "4", "4")); // 2. receive DataChangeEvents and SchemaChangeEvents during one checkpoint writeAndCommit( @@ -313,14 +304,13 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto Assertions.assertThat(result).hasSameSizeAs(deduplicated); Assertions.assertThat(fetchResults(table1)) - .isEqualTo( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", null), - Row.ofKind(RowKind.INSERT, "2", null), - Row.ofKind(RowKind.INSERT, "3", null), - Row.ofKind(RowKind.INSERT, "4", "4"), - Row.ofKind(RowKind.INSERT, "5", "5"), - Row.ofKind(RowKind.INSERT, "6", "6"))); + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", null), + Row.ofKind(RowKind.INSERT, "2", null), + Row.ofKind(RowKind.INSERT, "3", null), + Row.ofKind(RowKind.INSERT, "4", "4"), + Row.ofKind(RowKind.INSERT, "5", "5"), + Row.ofKind(RowKind.INSERT, "6", "6")); TruncateTableEvent truncateTableEvent = new TruncateTableEvent(table1); metadataApplier.applySchemaChange(truncateTableEvent); @@ -366,12 +356,10 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector writeAndCommit(writer, committer, testEvents.toArray(new Event[0])); Assertions.assertThat(fetchResults(table1)) - .isEqualTo( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1"), - Row.ofKind(RowKind.INSERT, "2", "2"))); + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")); Assertions.assertThat(fetchResults(table2)) - .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1"))); + .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1", "1")); } private static void commit( @@ -404,6 +392,18 @@ private List fetchResults(TableId tableId) { return results; } + private List fetchMaxSequenceNumber(String tableName) { + List results = new ArrayList<>(); + tEnv.sqlQuery( + "select max_sequence_number from paimon_catalog.test.`" + + tableName + + "$files`") + .execute() + .collect() + .forEachRemaining(results::add); + return results; + } + private BinaryRecordData generate(List> elements) { BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator( @@ -508,7 +508,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele .collect() .forEachRemaining(result::add); Assertions.assertThat(result) - .containsExactly( + .containsExactlyInAnyOrder( Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2"), Row.ofKind(RowKind.INSERT, "3", "3"), From a27a9b523424258e316fd9cc69b12718348c7083 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 10 Jan 2025 19:43:04 +0800 Subject: [PATCH 08/12] Skip TRUNCATE TABLE test with deletion vector = true Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../paimon/sink/PaimonMetadataApplier.java | 13 ++++++++++--- .../connectors/paimon/sink/v2/PaimonSinkITCase.java | 11 +++++++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index f8c38cd35f7..0ac52637b54 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.common.types.utils.DataTypeUtils; @@ -326,9 +327,15 @@ private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolv } private void applyTruncateTable(TruncateTableEvent event) throws SchemaEvolveException { - try (BatchTableCommit batchTableCommit = - catalog.getTable(tableIdToIdentifier(event)).newBatchWriteBuilder().newCommit()) { - batchTableCommit.truncateTable(); + try { + Table table = catalog.getTable(tableIdToIdentifier(event)); + if (table.options().get("deletion-vectors.enabled").equals("true")) { + throw new UnsupportedSchemaChangeEventException( + event, "Unable to truncate a table with deletion vectors enabled.", null); + } + try (BatchTableCommit batchTableCommit = table.newBatchWriteBuilder().newCommit()) { + batchTableCommit.truncateTable(); + } } catch (Exception e) { throw new SchemaEvolveException(event, "Failed to apply truncate table event", e); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 93061c18170..0d176753839 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -38,6 +38,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; @@ -313,8 +314,14 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto Row.ofKind(RowKind.INSERT, "6", "6")); TruncateTableEvent truncateTableEvent = new TruncateTableEvent(table1); - metadataApplier.applySchemaChange(truncateTableEvent); - Assertions.assertThat(fetchResults(table1)).isEmpty(); + if (enableDeleteVector) { + Assertions.assertThatThrownBy( + () -> metadataApplier.applySchemaChange(truncateTableEvent)) + .isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class); + } else { + metadataApplier.applySchemaChange(truncateTableEvent); + Assertions.assertThat(fetchResults(table1)).isEmpty(); + } DropTableEvent dropTableEvent = new DropTableEvent(table1); metadataApplier.applySchemaChange(dropTableEvent); From b4e710abe739747afead6db5b6a85ba18cded5fc Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Sun, 12 Jan 2025 11:44:34 +0800 Subject: [PATCH 09/12] nit: make test more robust by adding retryable timeout Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../connectors/doris/sink/DorisMetadataApplierITCase.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index 38d20960d38..d71bfe05bb7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -472,9 +472,11 @@ public void testDorisTruncateTable() throws Exception { DataChangeEvent.insertEvent(tableId, generate(schema, 3, 4.5, "Cecily")), DataChangeEvent.insertEvent(tableId, generate(schema, 4, 5.6, "Derrida"))); runJobWithEvents(truncateTestingEvents); - assertEqualsInAnyOrder( + waitAndVerify( + tableId, + 3, Arrays.asList("3 | 4.5 | Cecily", "4 | 5.6 | Derrida"), - fetchTableContent(tableId, 3)); + DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L); } @Test From 1bb0a105ba578747bdcb0a7a8e1e95155cd30439 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 14 Jan 2025 11:18:22 +0800 Subject: [PATCH 10/12] fix: exception catching hierarchy Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 0d176753839..bc257d0db90 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -317,7 +317,11 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto if (enableDeleteVector) { Assertions.assertThatThrownBy( () -> metadataApplier.applySchemaChange(truncateTableEvent)) - .isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class); + .isExactlyInstanceOf(SchemaEvolveException.class) + .cause() + .isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class) + .extracting("exceptionMessage") + .isEqualTo("Unable to truncate a table with deletion vectors enabled."); } else { metadataApplier.applySchemaChange(truncateTableEvent); Assertions.assertThat(fetchResults(table1)).isEmpty(); From 74767deec137e6f811cf0dd99839630aba7353c4 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 14 Jan 2025 11:54:57 +0800 Subject: [PATCH 11/12] fix: catch exceptions well... Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java index 33b7ce63b82..9110cf61c11 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java @@ -43,6 +43,7 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLSyntaxErrorException; import java.sql.Statement; import java.time.Duration; import java.time.temporal.ChronoUnit; @@ -618,7 +619,7 @@ public void testSchemaEvolution() throws Exception { SQLException thrown = assertThrows( - SQLException.class, + SQLSyntaxErrorException.class, () -> { try (Connection conn = DriverManager.getConnection( From 42c504f5e4c3d62246fe2e7895bf66bed149cd68 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 14 Jan 2025 14:34:10 +0800 Subject: [PATCH 12/12] nit: fix unstable `MySqlToDorisE2eITCase` Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../pipeline/tests/MySqlToDorisE2eITCase.java | 472 ++++++++++++------ 1 file changed, 319 insertions(+), 153 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java index 9110cf61c11..a83c6484a23 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.Container; -import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.lifecycle.Startables; @@ -46,7 +45,6 @@ import java.sql.SQLSyntaxErrorException; import java.sql.Statement; import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -54,10 +52,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** End-to-end tests for mysql cdc to Doris pipeline job. */ @RunWith(Parameterized.class) @@ -70,8 +67,8 @@ public class MySqlToDorisE2eITCase extends PipelineTestEnvironment { protected static final String MYSQL_TEST_USER = "mysqluser"; protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; - public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; - public static final int TESTCASE_TIMEOUT_SECONDS = 60; + public static final Duration DEFAULT_STARTUP_TIMEOUT = Duration.ofSeconds(240); + public static final Duration DEFAULT_RESULT_VERIFY_TIMEOUT = Duration.ofSeconds(30); @ClassRule public static final MySqlContainer MYSQL = @@ -84,14 +81,11 @@ public class MySqlToDorisE2eITCase extends PipelineTestEnvironment { .withUsername("flinkuser") .withPassword("flinkpw") .withNetwork(NETWORK) - .withNetworkAliases("mysql") - .withLogConsumer(new Slf4jLogConsumer(LOG)); + .withNetworkAliases("mysql"); @ClassRule public static final DorisContainer DORIS = - new DorisContainer(NETWORK) - .withNetworkAliases("doris") - .withLogConsumer(new Slf4jLogConsumer(LOG)); + new DorisContainer(NETWORK).withNetworkAliases("doris"); protected final UniqueDatabase mysqlInventoryDatabase = new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); @@ -110,14 +104,13 @@ public static void initializeContainers() { new LogMessageWaitStrategy() .withRegEx(".*get heartbeat from FE.*") .withTimes(1) - .withStartupTimeout( - Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS)) + .withStartupTimeout(DEFAULT_STARTUP_TIMEOUT) .waitUntilReady(DORIS); while (!checkBackendAvailability()) { try { if (System.currentTimeMillis() - startWaitingTimestamp - > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + > DEFAULT_STARTUP_TIMEOUT.toMillis()) { throw new RuntimeException("Doris backend startup timed out."); } LOG.info("Waiting for backends to be available"); @@ -175,6 +168,7 @@ public void after() { @Test public void testSyncWholeDatabase() throws Exception { + String databaseName = mysqlInventoryDatabase.getDatabaseName(); String pipelineJob = String.format( "source:\n" @@ -199,7 +193,7 @@ public void testSyncWholeDatabase() throws Exception { + " parallelism: %d", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, - mysqlInventoryDatabase.getDatabaseName(), + databaseName, DORIS.getUsername(), DORIS.getPassword(), parallelism); @@ -210,8 +204,19 @@ public void testSyncWholeDatabase() throws Exception { waitUntilJobRunning(Duration.ofSeconds(30)); LOG.info("Pipeline job is running"); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null")); validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "products", 7, Arrays.asList( @@ -225,8 +230,16 @@ public void testSyncWholeDatabase() throws Exception { "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null")); + validateSinkSchema( + databaseName, + "customers", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "address | VARCHAR(3072) | Yes | false | null", + "phone_number | VARCHAR(1536) | Yes | false | null")); validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "customers", 4, Arrays.asList( @@ -240,9 +253,7 @@ public void testSyncWholeDatabase() throws Exception { String mysqlJdbcUrl = String.format( "jdbc:mysql://%s:%s/%s", - MYSQL.getHost(), - MYSQL.getDatabasePort(), - mysqlInventoryDatabase.getDatabaseName()); + MYSQL.getHost(), MYSQL.getDatabasePort(), databaseName); try (Connection conn = DriverManager.getConnection( mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); @@ -252,7 +263,7 @@ public void testSyncWholeDatabase() throws Exception { "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110 validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "products", 7, Arrays.asList( @@ -269,39 +280,65 @@ public void testSyncWholeDatabase() throws Exception { stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + validateSinkResult( + databaseName, + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.1 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null")); // modify table schema stat.execute("ALTER TABLE products DROP COLUMN point_c;"); - stat.execute("DELETE FROM products WHERE id=101;"); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null")); + stat.execute("DELETE FROM products WHERE id=101;"); stat.execute( "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null);"); // 111 stat.execute( "INSERT INTO products VALUES (default,'finally', null, 2.14, null, null);"); // 112 + validateSinkResult( + databaseName, + "products", + 7, + Arrays.asList( + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | null", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | null", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | null", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | null", + "106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.1 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null", + "111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null", + "112 | finally | null | 2.14 | null | null | null")); } catch (SQLException e) { LOG.error("Update table for CDC failed.", e); throw e; } - validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), - "products", - 7, - Arrays.asList( - "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | null", - "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | null", - "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | null", - "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | null", - "106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null", - "107 | rocks | box of assorted rocks | 5.1 | null | null | null", - "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", - "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", - "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null", - "111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null", - "112 | finally | null | 2.14 | null | null | null")); } @Test public void testComplexDataTypes() throws Exception { + String databaseName = complexDataTypesDatabase.getDatabaseName(); String pipelineJob = String.format( "source:\n" @@ -330,32 +367,87 @@ public void testComplexDataTypes() throws Exception { + " parallelism: %d", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, - complexDataTypesDatabase.getDatabaseName(), + databaseName, DORIS.getUsername(), DORIS.getPassword(), - complexDataTypesDatabase.getDatabaseName(), + databaseName, parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar); waitUntilJobRunning(Duration.ofSeconds(30)); - LOG.info("Pipeline job is running"); + + LOG.info("Verifying snapshot stage of DATA_TYPES_TABLE..."); + validateSinkSchema( + databaseName, + "DATA_TYPES_TABLE", + Arrays.asList( + "id | INT | Yes | true | null", + "tiny_c | TINYINT | Yes | false | null", + "tiny_un_c | SMALLINT | Yes | false | null", + "tiny_un_z_c | SMALLINT | Yes | false | null", + "small_c | SMALLINT | Yes | false | null", + "small_un_c | INT | Yes | false | null", + "small_un_z_c | INT | Yes | false | null", + "medium_c | INT | Yes | false | null", + "medium_un_c | INT | Yes | false | null", + "medium_un_z_c | INT | Yes | false | null", + "int_c | INT | Yes | false | null", + "int_un_c | BIGINT | Yes | false | null", + "int_un_z_c | BIGINT | Yes | false | null", + "int11_c | INT | Yes | false | null", + "big_c | BIGINT | Yes | false | null", + "varchar_c | VARCHAR(765) | Yes | false | null", + "char_c | CHAR(9) | Yes | false | null", + "real_c | DOUBLE | Yes | false | null", + "float_c | FLOAT | Yes | false | null", + "float_un_c | FLOAT | Yes | false | null", + "float_un_z_c | FLOAT | Yes | false | null", + "double_c | DOUBLE | Yes | false | null", + "double_un_c | DOUBLE | Yes | false | null", + "double_un_z_c | DOUBLE | Yes | false | null", + "decimal_c | DECIMAL(8, 4) | Yes | false | null", + "decimal_un_c | DECIMAL(8, 4) | Yes | false | null", + "decimal_un_z_c | DECIMAL(8, 4) | Yes | false | null", + "numeric_c | DECIMAL(6, 0) | Yes | false | null", + "big_decimal_c | TEXT | Yes | false | null", + "bit1_c | BOOLEAN | Yes | false | null", + "tiny1_c | BOOLEAN | Yes | false | null", + "boolean_c | BOOLEAN | Yes | false | null", + "date_c | DATE | Yes | false | null", + "datetime3_c | DATETIME(3) | Yes | false | null", + "datetime6_c | DATETIME(6) | Yes | false | null", + "timestamp_c | DATETIME | Yes | false | null", + "text_c | TEXT | Yes | false | null", + "tiny_blob_c | TEXT | Yes | false | null", + "blob_c | TEXT | Yes | false | null", + "medium_blob_c | TEXT | Yes | false | null", + "long_blob_c | TEXT | Yes | false | null", + "year_c | INT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "point_c | TEXT | Yes | false | null", + "geometry_c | TEXT | Yes | false | null", + "linestring_c | TEXT | Yes | false | null", + "polygon_c | TEXT | Yes | false | null", + "multipoint_c | TEXT | Yes | false | null", + "multiline_c | TEXT | Yes | false | null", + "multipolygon_c | TEXT | Yes | false | null", + "geometrycollection_c | TEXT | Yes | false | null", + "FINE | TEXT | Yes | false | null")); validateSinkResult( - complexDataTypesDatabase.getDatabaseName(), + databaseName, "DATA_TYPES_TABLE", 52, Collections.singletonList( "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine")); - LOG.info("Begin incremental reading stage."); + LOG.info("Verifying streaming stage of DATA_TYPES_TABLE..."); // generate binlogs String mysqlJdbcUrl = String.format( "jdbc:mysql://%s:%s/%s", - MYSQL.getHost(), - MYSQL.getDatabasePort(), - complexDataTypesDatabase.getDatabaseName()); + MYSQL.getHost(), MYSQL.getDatabasePort(), databaseName); try (Connection conn = DriverManager.getConnection( mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); @@ -385,7 +477,7 @@ public void testComplexDataTypes() throws Exception { } validateSinkResult( - complexDataTypesDatabase.getDatabaseName(), + databaseName, "DATA_TYPES_TABLE", 52, Arrays.asList( @@ -400,6 +492,7 @@ public void testComplexDataTypes() throws Exception { @Test public void testSchemaEvolution() throws Exception { + String databaseName = mysqlInventoryDatabase.getDatabaseName(); String pipelineJob = String.format( "source:\n" @@ -425,7 +518,7 @@ public void testSchemaEvolution() throws Exception { + " parallelism: %d", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, - mysqlInventoryDatabase.getDatabaseName(), + databaseName, DORIS.getUsername(), DORIS.getPassword(), parallelism); @@ -434,10 +527,21 @@ public void testSchemaEvolution() throws Exception { Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar); waitUntilJobRunning(Duration.ofSeconds(30)); - LOG.info("Pipeline job is running"); + LOG.info("Verifying snapshot data from `products`..."); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null")); validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "products", 7, Arrays.asList( @@ -451,8 +555,17 @@ public void testSchemaEvolution() throws Exception { "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null")); + LOG.info("Verifying snapshot data from `customers`..."); + validateSinkSchema( + databaseName, + "customers", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "address | VARCHAR(3072) | Yes | false | null", + "phone_number | VARCHAR(1536) | Yes | false | null")); validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "customers", 4, Arrays.asList( @@ -461,26 +574,23 @@ public void testSchemaEvolution() throws Exception { "103 | user_3 | Shanghai | 123567891234", "104 | user_4 | Shanghai | 123567891234")); - LOG.info("Begin incremental reading stage."); - // generate binlogs String mysqlJdbcUrl = String.format( "jdbc:mysql://%s:%s/%s", - MYSQL.getHost(), - MYSQL.getDatabasePort(), - mysqlInventoryDatabase.getDatabaseName()); + MYSQL.getHost(), MYSQL.getDatabasePort(), databaseName); try (Connection conn = DriverManager.getConnection( mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); Statement stat = conn.createStatement()) { + LOG.info("Switching to streaming stage..."); stat.execute( "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110 // Ensure we've entered binlog reading stage validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "products", 7, Arrays.asList( @@ -496,12 +606,24 @@ public void testSchemaEvolution() throws Exception { "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null")); // Schema change - Add Column + LOG.info("Test Schema Change - Add Column..."); stat.execute("ALTER TABLE products ADD COLUMN extras INT;"); - Thread.sleep(1000L); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null", + "extras | INT | Yes | false | null")); stat.execute( "INSERT INTO products VALUES (default, 'blt', 'bacon, lettuce and tomato sandwich', 0.2, null, null, null, 17)"); // 111 validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "products", 8, Arrays.asList( @@ -518,12 +640,24 @@ public void testSchemaEvolution() throws Exception { "111 | blt | bacon, lettuce and tomato sandwich | 0.2 | null | null | null | 17")); // Schema change - Rename Column + LOG.info("Test Schema Change - Rename Column..."); stat.execute("ALTER TABLE products RENAME COLUMN extras TO extra_col;"); - Thread.sleep(1000L); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null", + "extra_col | INT | Yes | false | null")); stat.execute( "INSERT INTO products VALUES (default, 'cheeseburger', 'meat patty, cheese slice and onions', 0.1, null, null, null, 18)"); // 112 validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "products", 8, Arrays.asList( @@ -541,12 +675,24 @@ public void testSchemaEvolution() throws Exception { "112 | cheeseburger | meat patty, cheese slice and onions | 0.1 | null | null | null | 18")); // Schema change - Alter Column Type - stat.execute("ALTER TABLE products MODIFY COLUMN extra_col double;"); - Thread.sleep(1000L); + LOG.info("Test Schema Change - Alter Column Type..."); + stat.execute("ALTER TABLE products MODIFY COLUMN extra_col DOUBLE;"); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null", + "extra_col | DOUBLE | Yes | false | null")); stat.execute( - "INSERT INTO products VALUES (default, 'fries', 'potato and salt', 0.05, null, null, null, 19)"); // 113 + "INSERT INTO products VALUES (default, 'fries', 'potato and salt', 0.05, null, null, null, 19.5)"); // 113 validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "products", 8, Arrays.asList( @@ -562,15 +708,26 @@ public void testSchemaEvolution() throws Exception { "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null | null", "111 | blt | bacon, lettuce and tomato sandwich | 0.2 | null | null | null | 17.0", "112 | cheeseburger | meat patty, cheese slice and onions | 0.1 | null | null | null | 18.0", - "113 | fries | potato and salt | 0.05 | null | null | null | 19.0")); + "113 | fries | potato and salt | 0.05 | null | null | null | 19.5")); // Schema change - Drop Column + LOG.info("Test Schema Change - Drop Column..."); stat.execute("ALTER TABLE products DROP COLUMN extra_col;"); - Thread.sleep(1000L); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null")); stat.execute( "INSERT INTO products VALUES (default, 'mac', 'cheese', 0.025, null, null, null)"); // 114 validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "products", 7, Arrays.asList( @@ -588,52 +745,39 @@ public void testSchemaEvolution() throws Exception { "112 | cheeseburger | meat patty, cheese slice and onions | 0.1 | null | null | null", "113 | fries | potato and salt | 0.05 | null | null | null", "114 | mac | cheese | 0.025 | null | null | null")); - } catch (SQLException e) { - LOG.error("Update table for CDC failed.", e); - throw e; - } - try (Connection conn = - DriverManager.getConnection( - mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); - Statement stat = conn.createStatement()) { stat.execute("TRUNCATE TABLE products;"); - Thread.sleep(1000L); + Thread.sleep(5000L); stat.execute( "INSERT INTO products VALUES (default, 'pasta', 'noodles', 0, null, null, null);"); // 1, because truncating resets auto_increment id - } - validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), - "products", - 7, - Collections.singletonList("1 | pasta | noodles | 0.0 | null | null | null")); + validateSinkResult( + databaseName, + "products", + 7, + Collections.singletonList("1 | pasta | noodles | 0.0 | null | null | null")); - try (Connection conn = - DriverManager.getConnection( - mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); - Statement stat = conn.createStatement()) { stat.execute("DROP TABLE products;"); + Thread.sleep(5000L); + SQLException thrown = + assertThrows( + SQLSyntaxErrorException.class, + () -> { + try (Connection connection = + DriverManager.getConnection( + DORIS.getJdbcUrl( + databaseName, + DORIS.getUsername())); + Statement statement = connection.createStatement()) { + statement.executeQuery("SELECT * FROM products;"); + } + }); + assertTrue( + thrown.getMessage() + .contains("errCode = 2, detailMessage = Unknown table 'products'")); + } catch (SQLException e) { + throw new RuntimeException("Failed to trigger schema change.", e); } - Thread.sleep(1000L); - - SQLException thrown = - assertThrows( - SQLSyntaxErrorException.class, - () -> { - try (Connection conn = - DriverManager.getConnection( - DORIS.getJdbcUrl( - mysqlInventoryDatabase - .getDatabaseName(), - DORIS.getUsername())); - Statement stat = conn.createStatement()) { - stat.executeQuery("SELECT * FROM products;"); - } - }); - assertTrue( - thrown.getMessage() - .contains("errCode = 2, detailMessage = Unknown table 'products'")); } public static void createDorisDatabase(String databaseName) { @@ -677,65 +821,87 @@ public static void dropDorisDatabase(String databaseName) { private void validateSinkResult( String databaseName, String tableName, int columnCount, List expected) throws Exception { - validateSqlResults( + waitAndVerify( databaseName, - String.format("SELECT * FROM `%s`.`%s`;", databaseName, tableName), + "SELECT * FROM " + tableName, columnCount, - expected); + expected, + DEFAULT_RESULT_VERIFY_TIMEOUT.toMillis(), + true); } - private void validateSqlResults( - String databaseName, String sql, int columnCount, List expected) + private void validateSinkSchema(String databaseName, String tableName, List expected) throws Exception { - long startWaitingTimestamp = System.currentTimeMillis(); - while (true) { - if (System.currentTimeMillis() - startWaitingTimestamp - > TESTCASE_TIMEOUT_SECONDS * 1000) { - throw new RuntimeException("Doris backend startup timed out."); - } - List results = new ArrayList<>(); - try (Connection conn = - DriverManager.getConnection( - DORIS.getJdbcUrl(databaseName, DORIS.getUsername())); - Statement stat = conn.createStatement()) { - ResultSet rs = stat.executeQuery(sql); - - while (rs.next()) { - List columns = new ArrayList<>(); - for (int i = 1; i <= columnCount; i++) { - try { - columns.add(rs.getString(i)); - } catch (SQLException ignored) { - // Column count could change after schema evolution - columns.add(null); - } - } - results.add(String.join(" | ", columns)); - } + waitAndVerify( + databaseName, + "DESCRIBE " + tableName, + 5, + expected, + DEFAULT_RESULT_VERIFY_TIMEOUT.toMillis(), + false); + } - if (expected.size() == results.size()) { - assertEqualsInAnyOrder(expected, results); - break; + private void waitAndVerify( + String databaseName, + String sql, + int numberOfColumns, + List expected, + long timeoutMilliseconds, + boolean inAnyOrder) + throws Exception { + long deadline = System.currentTimeMillis() + timeoutMilliseconds; + while (System.currentTimeMillis() < deadline) { + try { + List actual = fetchTableContent(databaseName, sql, numberOfColumns); + if (inAnyOrder) { + if (expected.stream() + .sorted() + .collect(Collectors.toList()) + .equals(actual.stream().sorted().collect(Collectors.toList()))) { + return; + } } else { - Thread.sleep(1000); + if (expected.equals(actual)) { + return; + } } - } catch (SQLException e) { - LOG.info("Validate sink result failure, waiting for next turn...", e); - Thread.sleep(1000); + LOG.info( + "Executing {}::{} didn't get expected results.\nExpected: {}\n Actual: {}", + databaseName, + sql, + expected, + actual); + } catch (SQLSyntaxErrorException t) { + LOG.info("Database {} isn't ready yet. Waiting for the next loop...", databaseName); } + Thread.sleep(1000L); } + fail(String.format("Failed to verify content of %s::%s.", databaseName, sql)); } - public static void assertEqualsInAnyOrder(List expected, List actual) { - assertTrue(expected != null && actual != null); - assertEqualsInOrder( - expected.stream().sorted().collect(Collectors.toList()), - actual.stream().sorted().collect(Collectors.toList())); - } + private List fetchTableContent(String databaseName, String sql, int columnCount) + throws Exception { - public static void assertEqualsInOrder(List expected, List actual) { - assertTrue(expected != null && actual != null); - assertEquals(expected.size(), actual.size()); - assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + List results = new ArrayList<>(); + try (Connection conn = + DriverManager.getConnection( + DORIS.getJdbcUrl(databaseName, DORIS.getUsername())); + Statement stat = conn.createStatement()) { + ResultSet rs = stat.executeQuery(sql); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + try { + columns.add(rs.getString(i)); + } catch (SQLException ignored) { + // Column count could change after schema evolution + columns.add(null); + } + } + results.add(String.join(" | ", columns)); + } + } + return results; } }