Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ public static LocalZonedTimestampData fromInstant(Instant instant) {
* milliseconds.
*/
public static boolean isCompact(int precision) {
return precision <= 3;
// We don't use compact mode to store any timestamps for now. See TimestampData#isCompact
// for more details.
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public static TimestampData fromTimestamp(Timestamp timestamp) {
* Returns whether the timestamp data is small enough to be stored in a long of milliseconds.
*/
public static boolean isCompact(int precision) {
return precision <= 3;
// We don't use compact mode to store any timestamps for now since currently MySQL source
// could not correctly infer timestamp precision from Debezium records, and precision
// mismatch could cause downstream deserialization failure.
// By enforcing the non-compaction mode, we could ensure timestamp data with any precision
// could be correctly parsed.
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ public static ZonedTimestampData of(long millisecond, int nanoOfMillisecond, Str
* Returns whether the date-time part is small enough to be stored in a long of milliseconds.
*/
public static boolean isCompact(int precision) {
return precision <= 3;
// We don't use compact mode to store any timestamps for now. See TimestampData#isCompact
// for more details.
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.flink.cdc.composer.flink;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
Expand Down Expand Up @@ -46,6 +49,7 @@
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -716,4 +720,118 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}

@Test
void testTimestampWithPrecision() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId sourceTable =
TableId.tableId("default_namespace", "default_schema", "timestamp_table");
Schema sourceTableSchema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("ts0", DataTypes.TIMESTAMP(0))
.physicalColumn("ts1", DataTypes.TIMESTAMP(1))
.physicalColumn("ts2", DataTypes.TIMESTAMP(2))
.physicalColumn("ts3", DataTypes.TIMESTAMP(3))
.physicalColumn("ts4", DataTypes.TIMESTAMP(4))
.physicalColumn("ts5", DataTypes.TIMESTAMP(5))
.physicalColumn("ts6", DataTypes.TIMESTAMP(6))
.physicalColumn("ts_tz0", DataTypes.TIMESTAMP_TZ(0))
.physicalColumn("ts_tz1", DataTypes.TIMESTAMP_TZ(1))
.physicalColumn("ts_tz2", DataTypes.TIMESTAMP_TZ(2))
.physicalColumn("ts_tz3", DataTypes.TIMESTAMP_TZ(3))
.physicalColumn("ts_tz4", DataTypes.TIMESTAMP_TZ(4))
.physicalColumn("ts_tz5", DataTypes.TIMESTAMP_TZ(5))
.physicalColumn("ts_tz6", DataTypes.TIMESTAMP_TZ(6))
.physicalColumn("ts_ltz0", DataTypes.TIMESTAMP_LTZ(0))
.physicalColumn("ts_ltz1", DataTypes.TIMESTAMP_LTZ(1))
.physicalColumn("ts_ltz2", DataTypes.TIMESTAMP_LTZ(2))
.physicalColumn("ts_ltz3", DataTypes.TIMESTAMP_LTZ(3))
.physicalColumn("ts_ltz4", DataTypes.TIMESTAMP_LTZ(4))
.physicalColumn("ts_ltz5", DataTypes.TIMESTAMP_LTZ(5))
.physicalColumn("ts_ltz6", DataTypes.TIMESTAMP_LTZ(6))
.primaryKey("id")
.build();
List<Event> events = new ArrayList<>();
BinaryRecordDataGenerator sourceTableDataGenerator =
new BinaryRecordDataGenerator(
sourceTableSchema.getColumnDataTypes().toArray(new DataType[0]));

TimestampData timestampData = TimestampData.fromMillis(17_000_000_000L, 0);
ZonedTimestampData zonedTimestampData = ZonedTimestampData.of(17_000_000_000L, 0, "UTC");
LocalZonedTimestampData localZonedTimestampData =
LocalZonedTimestampData.fromEpochMillis(17_000_000_000L, 0);

events.add(new CreateTableEvent(sourceTable, sourceTableSchema));
events.add(
DataChangeEvent.insertEvent(
sourceTable,
sourceTableDataGenerator.generate(
new Object[] {
1,
timestampData,
timestampData,
timestampData,
timestampData,
timestampData,
timestampData,
timestampData,
zonedTimestampData,
zonedTimestampData,
zonedTimestampData,
zonedTimestampData,
zonedTimestampData,
zonedTimestampData,
zonedTimestampData,
localZonedTimestampData,
localZonedTimestampData,
localZonedTimestampData,
localZonedTimestampData,
localZonedTimestampData,
localZonedTimestampData,
localZonedTimestampData
})));

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
Schema sinkTableSchema = ValuesDatabase.getTableSchema(sourceTable);
Assertions.assertThat(sinkTableSchema).isEqualTo(sourceTableSchema);
String[] outputEvents = outCaptor.toString().trim().split("\n");

Assertions.assertThat(outputEvents)
.isEqualTo(
new String[] {
"CreateTableEvent{tableId=default_namespace.default_schema.timestamp_table, schema=columns={`id` INT,`ts0` TIMESTAMP(0),`ts1` TIMESTAMP(1),`ts2` TIMESTAMP(2),`ts3` TIMESTAMP(3),`ts4` TIMESTAMP(4),`ts5` TIMESTAMP(5),`ts6` TIMESTAMP(6),`ts_tz0` TIMESTAMP(0) WITH TIME ZONE,`ts_tz1` TIMESTAMP(1) WITH TIME ZONE,`ts_tz2` TIMESTAMP(2) WITH TIME ZONE,`ts_tz3` TIMESTAMP(3) WITH TIME ZONE,`ts_tz4` TIMESTAMP(4) WITH TIME ZONE,`ts_tz5` TIMESTAMP(5) WITH TIME ZONE,`ts_tz6` TIMESTAMP(6) WITH TIME ZONE,`ts_ltz0` TIMESTAMP_LTZ(0),`ts_ltz1` TIMESTAMP_LTZ(1),`ts_ltz2` TIMESTAMP_LTZ(2),`ts_ltz3` TIMESTAMP_LTZ(3),`ts_ltz4` TIMESTAMP_LTZ(4),`ts_ltz5` TIMESTAMP_LTZ(5),`ts_ltz6` TIMESTAMP_LTZ(6)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.timestamp_table, before=[], after=[1, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20Z, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20, 1970-07-16T18:13:20], op=INSERT, meta=()}"
});
}
}
Loading