diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java index 81c40180407..cd2850316ee 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java @@ -27,6 +27,8 @@ import org.apache.flink.formats.json.JsonFormatOptions; import org.apache.flink.formats.json.JsonFormatOptionsUtil; +import java.time.ZoneId; + import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL; @@ -46,7 +48,7 @@ public class ChangeLogJsonFormatFactory { * @return The configured instance of {@link SerializationSchema}. */ public static SerializationSchema createSerializationSchema( - ReadableConfig formatOptions, JsonSerializationType type) { + ReadableConfig formatOptions, JsonSerializationType type, ZoneId zoneId) { TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions); JsonFormatOptions.MapNullKeyMode mapNullKeyMode = JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions); @@ -62,6 +64,7 @@ public static SerializationSchema createSerializationSchema( timestampFormat, mapNullKeyMode, mapNullKeyLiteral, + zoneId, encodeDecimalAsPlainNumber); } case CANAL_JSON: @@ -70,6 +73,7 @@ public static SerializationSchema createSerializationSchema( timestampFormat, mapNullKeyMode, mapNullKeyLiteral, + zoneId, encodeDecimalAsPlainNumber); } default: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java index eecd2b80197..78548e31b77 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java @@ -81,12 +81,13 @@ public CanalJsonSerializationSchema( TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, + ZoneId zoneId, boolean encodeDecimalAsPlainNumber) { this.timestampFormat = timestampFormat; this.mapNullKeyMode = mapNullKeyMode; this.mapNullKeyLiteral = mapNullKeyLiteral; this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber; - this.zoneId = ZoneId.systemDefault(); + this.zoneId = zoneId; jsonSerializers = new HashMap<>(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java index 3fa1d4ce19f..2f305ce4258 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java @@ -80,12 +80,13 @@ public DebeziumJsonSerializationSchema( TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, + ZoneId zoneId, boolean encodeDecimalAsPlainNumber) { this.timestampFormat = timestampFormat; this.mapNullKeyMode = mapNullKeyMode; this.mapNullKeyLiteral = mapNullKeyLiteral; this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber; - this.zoneId = ZoneId.systemDefault(); + this.zoneId = zoneId; jsonSerializers = new HashMap<>(); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java index 9b117243fea..f993d63258c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java @@ -62,7 +62,7 @@ public DataSink createDataSink(Context context) { context.getFactoryConfiguration().get(KafkaDataSinkOptions.VALUE_FORMAT); SerializationSchema valueSerialization = ChangeLogJsonFormatFactory.createSerializationSchema( - configuration, jsonSerializationType); + configuration, jsonSerializationType, zoneId); final Properties kafkaProperties = new Properties(); Map allOptions = context.getFactoryConfiguration().toMap(); allOptions.keySet().stream() diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java index 52181fd6bd1..c6335e6d69c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java @@ -40,6 +40,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.time.ZoneId; + /** Tests for {@link CanalJsonSerializationSchema}. */ public class CanalJsonSerializationSchemaTest { @@ -53,7 +55,9 @@ public void testSerialize() throws Exception { .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false); SerializationSchema serializationSchema = ChangeLogJsonFormatFactory.createSerializationSchema( - new Configuration(), JsonSerializationType.CANAL_JSON); + new Configuration(), + JsonSerializationType.CANAL_JSON, + ZoneId.systemDefault()); serializationSchema.open(new MockInitializationContext()); // create table diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java index 7a47bf8bd2d..0be02b9b382 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java @@ -40,6 +40,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.time.ZoneId; + /** Tests for {@link DebeziumJsonSerializationSchema}. */ public class DebeziumJsonSerializationSchemaTest { @@ -53,7 +55,9 @@ public void testSerialize() throws Exception { .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false); SerializationSchema serializationSchema = ChangeLogJsonFormatFactory.createSerializationSchema( - new Configuration(), JsonSerializationType.DEBEZIUM_JSON); + new Configuration(), + JsonSerializationType.DEBEZIUM_JSON, + ZoneId.systemDefault()); serializationSchema.open(new MockInitializationContext()); // create table Schema schema =