Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ on:
description: "Flink CDC module to test against."
required: true
type: string
parallelism:
description: "Flink parallelism."
required: false
type: number
default: 4
custom-maven-parameter:
description: "Custom maven parameter."
required: false
Expand Down Expand Up @@ -210,7 +215,7 @@ jobs:

build_maven_parameter="${build_maven_parameter:+$build_maven_parameter }${{ inputs.custom-maven-parameter }}"

mvn --no-snapshot-updates -B -DskipTests -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules verify
mvn --no-snapshot-updates -B -DskipTests -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} verify

- name: Print JVM thread dumps when cancelled
if: ${{ failure() }}
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/flink_cdc_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ jobs:
java-version: "[8]"
module: ${{ matrix.module }}
pipeline_e2e:
name: Pipeline E2E Tests
strategy:
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests (${{ matrix.parallelism }}-Parallelism)
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-version: "[8]"
flink-version: "['1.19.1', '1.20.0']"
module: pipeline_e2e
parallelism: ${{ matrix.parallelism }}
source_e2e:
name: Source E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/flink_cdc_ci_nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ jobs:
java-version: "[11]"
module: ${{ matrix.module }}
pipeline_e2e:
name: Pipeline E2E Tests
strategy:
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests (${{ matrix.parallelism }} Parallelism)
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-version: "[11]"
flink-version: "['1.19.1', '1.20.0']"
module: pipeline_e2e
parallelism: ${{ matrix.parallelism }}
source_e2e:
name: Source E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ADD_COLUMN;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new AddColumnEvent(newTableId, addedColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,9 @@ public boolean trimRedundantChanges() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ALTER_COLUMN_TYPE;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new AlterColumnTypeEvent(newTableId, typeMapping, oldTypeMapping);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.CREATE_TABLE;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new CreateTableEvent(newTableId, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ public static DataChangeEvent projectAfter(
dataChangeEvent.meta);
}

/** Updates the {@link TableId} info of current data change event. */
public static DataChangeEvent route(DataChangeEvent dataChangeEvent, TableId tableId) {
return new DataChangeEvent(
tableId,
dataChangeEvent.before,
dataChangeEvent.after,
dataChangeEvent.op,
dataChangeEvent.meta);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.DROP_COLUMN;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new DropColumnEvent(newTableId, droppedColumnNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.DROP_TABLE;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new DropTableEvent(newTableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
*/
public class FlushEvent implements Event {

/** The schema changes from which table. */
private final TableId tableId;
/** Which subTask ID this FlushEvent was initiated from. */
private final int sourceSubTaskId;

public FlushEvent(TableId tableId) {
this.tableId = tableId;
public FlushEvent(int sourceSubTaskId) {
this.sourceSubTaskId = sourceSubTaskId;
}

public TableId getTableId() {
return tableId;
public int getSourceSubTaskId() {
return sourceSubTaskId;
}

@Override
Expand All @@ -45,11 +45,16 @@ public boolean equals(Object o) {
return false;
}
FlushEvent that = (FlushEvent) o;
return Objects.equals(tableId, that.tableId);
return sourceSubTaskId == that.sourceSubTaskId;
}

@Override
public int hashCode() {
return Objects.hash(tableId);
return Objects.hash(sourceSubTaskId);
}

@Override
public String toString() {
return "FlushEvent{" + "sourceSubTaskId=" + sourceSubTaskId + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.RENAME_COLUMN;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new RenameColumnEvent(newTableId, nameMapping);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@
public interface SchemaChangeEvent extends ChangeEvent, Serializable {
/** Returns its {@link SchemaChangeEventType}. */
SchemaChangeEventType getType();

/** Creates a copy of {@link SchemaChangeEvent} with new {@link TableId}. */
SchemaChangeEvent copy(TableId newTableId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class TableId implements Serializable {
@Nullable private final String namespace;
@Nullable private final String schemaName;
private final String tableName;
private transient int cachedHashCode;

private TableId(@Nullable String namespace, @Nullable String schemaName, String tableName) {
this.namespace = namespace;
Expand Down Expand Up @@ -125,7 +126,10 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(namespace, schemaName, tableName);
if (cachedHashCode == 0) {
cachedHashCode = Objects.hash(namespace, schemaName, tableName);
}
return cachedHashCode;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.TRUNCATE_TABLE;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new TruncateTableEvent(newTableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public class RouteRule implements Serializable {

private static final long serialVersionUID = 1L;

public RouteRule(String sourceTable, String sinkTable) {
this(sourceTable, sinkTable, null);
}

public RouteRule(String sourceTable, String sinkTable, String replaceSymbol) {
this.sourceTable = sourceTable;
this.sinkTable = sinkTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public class Schema implements Serializable {
// Used to index column by name
private transient volatile Map<String, Column> nameToColumns;

/**
* Schema might be used as a LoadingCache key frequently, and maintaining a cache of hashCode
* would be more efficient.
*/
private transient int cachedHashCode;

private Schema(
List<Column> columns,
List<String> primaryKeys,
Expand Down Expand Up @@ -186,7 +192,10 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(columns, primaryKeys, partitionKeys, options, comment);
if (cachedHashCode == 0) {
cachedHashCode = Objects.hash(columns, primaryKeys, partitionKeys, options, comment);
}
return cachedHashCode;
}

// -----------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.common.source;

import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.annotation.PublicEvolving;

/**
Expand All @@ -36,4 +37,18 @@ public interface DataSource {
default SupportedMetadataColumn[] supportedMetadataColumns() {
return new SupportedMetadataColumn[0];
}

/**
* Indicating if this source may generate metadata events (SchemaChangeEvents) in parallel for
* each table. If returns {@code false}, you'll get a regular operator topology that is
* compatible with single-incremented sources like MySQL. Returns {@code true} for sources that
* does not maintain a globally sequential schema change events stream, like MongoDB or Kafka.
* <br>
* Note that new topology still an experimental feature. Return {@code false} by default to
* avoid unexpected behaviors.
*/
@Experimental
default boolean isParallelMetadataSource() {
return false;
}
}
Loading