diff --git a/chunjun-connectors/chunjun-connector-hbase-1.4/pom.xml b/chunjun-connectors/chunjun-connector-hbase-1.4/pom.xml
index 1e77e74b03..96c9b539c0 100644
--- a/chunjun-connectors/chunjun-connector-hbase-1.4/pom.xml
+++ b/chunjun-connectors/chunjun-connector-hbase-1.4/pom.xml
@@ -125,6 +125,10 @@
org.apache.http
com.dtstack.chunjun.connector.hbase14.org.apache.http
+
+ com.google.protobuf
+ com.dtstack.chunjun.connector.hbase14.com.google.protobuf
+
diff --git a/chunjun-connectors/chunjun-connector-hive/pom.xml b/chunjun-connectors/chunjun-connector-hive/pom.xml
index 71e641fb64..5d609cd6ec 100644
--- a/chunjun-connectors/chunjun-connector-hive/pom.xml
+++ b/chunjun-connectors/chunjun-connector-hive/pom.xml
@@ -282,6 +282,12 @@
+
+
+ com.google.protobuf
+ com.dtstack.chunjun.connector.hive.com.google.protobuf
+
+
diff --git a/chunjun-connectors/chunjun-connector-inceptor/pom.xml b/chunjun-connectors/chunjun-connector-inceptor/pom.xml
index fd841a2549..02d0f7316f 100644
--- a/chunjun-connectors/chunjun-connector-inceptor/pom.xml
+++ b/chunjun-connectors/chunjun-connector-inceptor/pom.xml
@@ -82,6 +82,10 @@
com.esotericsoftware.kryo
shade.inceptor.com.esotericsoftware.kryo
+
+ com.google.protobuf
+ shade.inceptor.com.google.protobuf
+
-
- chunjun-connectors
- com.dtstack.chunjun
- 1.12-SNAPSHOT
-
- 4.0.0
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ chunjun-connectors
+ com.dtstack.chunjun
+ 1.12-SNAPSHOT
+
+ 4.0.0
- chunjun-connector-kafka
+ chunjun-connector-kafka
ChunJun : Connectors : Kafka
-
-
- org.apache.flink
- flink-connector-kafka_${scala.binary.version}
- ${flink.version}
-
-
- flink-core
- org.apache.flink
-
-
-
+
+
+ org.apache.flink
+ flink-connector-kafka_${scala.binary.version}
+ ${flink.version}
+
+
+ flink-core
+ org.apache.flink
+
+
+
com.pingcap.ticdc.cdc
ticdc-decoder
5.2.0-SNAPSHOT
-
-
- com.alibaba
- fastjson
-
-
+
+
+ com.alibaba
+ fastjson
+
+
-
- com.alibaba
- fastjson
- 1.2.79
-
+
+ com.alibaba
+ fastjson
+ 1.2.79
+
org.apache.flink
@@ -54,13 +54,13 @@
avro
1.10.0
-
+
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
3.2.4
@@ -70,11 +70,11 @@
false
-
-
- com.alibaba.fastjson
- com.shade.alibaba.fastjson
-
+
+
+ com.alibaba.fastjson
+ com.shade.alibaba.fastjson
+
org.apache.avro
@@ -88,7 +88,11 @@
org.apache.commons.compress
org.apache.flink.avro.shaded.org.apache.commons.compress
-
+
+ com.google.protobuf
+ com.dtstack.chunjun.connector.kafka.shaded.com.google.protobuf
+
+
org.slf4j
@@ -116,28 +120,50 @@
META-INF/*.RSA
+
+
+ org.apache.flink:flink-avro
+
+ META-INF/services/*
+
+
+
+ org.apache.flink:flink-connector-kafka
+
+ META-INF/services/*
+
+
+
+ org.glassfish.jersey.core:jersey-common
+
+ META-INF/services/*
+
+
+
+
+
-
- maven-antrun-plugin
-
-
- copy-resources
-
- package
-
- run
-
-
-
-
+ maven-antrun-plugin
+
+
+ copy-resources
+
+ package
+
+ run
+
+
+
+
-
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/chunjun-connectors/chunjun-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 63bf1d6180..d840aac69f 100644
--- a/chunjun-connectors/chunjun-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/chunjun-connectors/chunjun-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,5 +15,3 @@
com.dtstack.chunjun.connector.kafka.table.KafkaDynamicTableFactory
com.dtstack.chunjun.connector.upsertkafka.table.UpsertKafkaDynamicTableFactory
-org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory
-org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroFormatFactory
diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/constants/ConstantValue.java b/chunjun-core/src/main/java/com/dtstack/chunjun/constants/ConstantValue.java
index b32bd16b33..e7b5eb51d4 100644
--- a/chunjun-core/src/main/java/com/dtstack/chunjun/constants/ConstantValue.java
+++ b/chunjun-core/src/main/java/com/dtstack/chunjun/constants/ConstantValue.java
@@ -72,6 +72,8 @@ public class ConstantValue {
public static final String CONNECTOR_DIR_NAME = "connector";
+ public static final String FORMAT_DIR_NAME = "formats";
+
public static final String DIRTY_DATA_DIR_NAME = "dirty-data-collector";
public static final String RESTORE_DIR_NAME = "restore-plugins";
diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/util/ConnectorNameConvertUtil.java b/chunjun-core/src/main/java/com/dtstack/chunjun/util/ConnectorNameConvertUtil.java
index 1a0a0eb31a..ca3677e494 100644
--- a/chunjun-core/src/main/java/com/dtstack/chunjun/util/ConnectorNameConvertUtil.java
+++ b/chunjun-core/src/main/java/com/dtstack/chunjun/util/ConnectorNameConvertUtil.java
@@ -40,6 +40,7 @@ public class ConnectorNameConvertUtil {
connectorNameMap.put("adbpostgresql", new Tuple2<>("postgresql", "postgresql"));
connectorNameMap.put("dorisbatch", new Tuple2<>("doris", "doris"));
connectorNameMap.put("gbase", new Tuple2<>("gBase", "gBase"));
+ connectorNameMap.put("protobuf", new Tuple2<>("pbformat", "pbformat"));
}
public static String convertClassPrefix(String originName) {
diff --git a/chunjun-core/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/chunjun-core/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index dceeef0132..748471fe08 100644
--- a/chunjun-core/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ b/chunjun-core/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -49,6 +49,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Locale;
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
@@ -654,6 +655,14 @@ private Optional discoverOptionalFormatFactory(
if (identifier == null) {
return Optional.empty();
}
+ if (identifier.toLowerCase(Locale.ROOT).endsWith("-x")) {
+ String s = identifier.substring(0, identifier.length() - 2);
+ FactoryHelper factoryHelper = factoryHelperThreadLocal.get();
+ if (factoryHelper != null) {
+ factoryHelper.registerCachedFile(
+ s, context.getClassLoader(), ConstantValue.FORMAT_DIR_NAME);
+ }
+ }
final F factory =
discoverFactory(context.getClassLoader(), formatFactoryClass, identifier);
String formatPrefix = formatPrefix(factory, formatOption);
diff --git a/chunjun-examples/sql/kafka/proto_proto.sql b/chunjun-examples/sql/kafka/proto_proto.sql
new file mode 100644
index 0000000000..dac088b590
--- /dev/null
+++ b/chunjun-examples/sql/kafka/proto_proto.sql
@@ -0,0 +1,133 @@
+--complex with flink original kafka-connector
+CREATE TABLE reader (
+ GroupInfo MAP
+ ,Messages ARRAY<
+ ROW<
+ TAGNAME VARCHAR ,
+ TagValue ROW<
+ `Value` ROW<
+ ValueCase INTEGER,
+ ValueBool BOOlEAN,
+ ArrayBool ROW<`Values` ARRAY>,
+ ValueInt32 INTEGER,
+ ArrayInt32 ROW<`Values` ARRAY>,
+ ValueUint32 INTEGER,
+ ArrayUint32 ROW<`Values` ARRAY>,
+ ValueInt64 BIGINT,
+ ArrayInt64 ROW<`Values` ARRAY>,
+ ValueUint64 BIGINT,
+ ArrayUint64 ROW<`Values` ARRAY>,
+ ValueFloat FLOAT,
+ ArrayFloat ROW<`Values` ARRAY>,
+ ValueDouble DOUBLE,
+ ArrayDouble ROW<`Values` ARRAY>,
+ ValueString STRING,
+ ArrayString ROW<`Values` ARRAY>,
+ ValueBytes BINARY,
+ ValueTimestamp BIGINT
+ >,
+ boolx BOOLEAN,
+ Value2 ROW<
+ ValueCase INTEGER ,
+ Value2Value BOOLEAN,
+ ArrayBool2 ROW<`Values` ARRAY>>,
+ booly BOOLEAN
+ >,
+ UaDataType INTEGER,
+ Quality BOOLEAN,
+ `Timestamp` BIGINT,
+ TagInfos MAP,
+ ExValues MAP
+ >
+ >
+
+
+
+-- , `topic` STRING METADATA VIRTUAL -- from Kafka connector
+-- , `leader-epoch` int METADATA VIRTUAL -- from Kafka connector
+-- , `offset` BIGINT METADATA VIRTUAL -- from Kafka connector
+-- , ts TIMESTAMP(3) METADATA FROM 'timestamp' -- from Kafka connector
+-- , `timestamp-type` STRING METADATA VIRTUAL -- from Kafka connector
+-- , partition_id BIGINT METADATA FROM 'partition' VIRTUAL -- from Kafka connector
+
+) WITH (
+ 'connector' = 'kafka-x'
+ ,'topic' = 'liuliu_proto_source'
+ ,'properties.bootstrap.servers' = 'flink01:9092'
+ ,'properties.group.id' = 'luna_g'
+ ,'scan.startup.mode' = 'earliest-offset'
+ ,'format' = 'protobuf-x'
+-- ,'protobuf-x.class.name'='ZPMC.Message.MessageGroupOuterClass'
+-- ,'protobuf-x.message.name'='MessageGroup'
+ ,'protobuf-x.message-class-name' = 'ZPMC.Message.MessageGroupOuterClass$MessageGroup'
+ ,'scan.parallelism' = '1'
+ );
+
+CREATE TABLE writer (
+ GroupInfo MAP
+ ,Messages ARRAY<
+ ROW<
+ TAGNAME VARCHAR ,
+ TagValue ROW<
+ `Value` ROW<
+ ValueCase INTEGER,
+ ValueBool BOOlEAN,
+ ArrayBool ROW<`Values` ARRAY>,
+ ValueInt32 INTEGER,
+ ArrayInt32 ROW<`Values` ARRAY>,
+ ValueUint32 INTEGER,
+ ArrayUint32 ROW<`Values` ARRAY>,
+ ValueInt64 BIGINT,
+ ArrayInt64 ROW<`Values` ARRAY>,
+ ValueUint64 BIGINT,
+ ArrayUint64 ROW<`Values` ARRAY>,
+ ValueFloat FLOAT,
+ ArrayFloat ROW<`Values` ARRAY>,
+ ValueDouble DOUBLE,
+ ArrayDouble ROW<`Values` ARRAY>,
+ ValueString STRING,
+ ArrayString ROW<`Values` ARRAY>,
+ ValueBytes BINARY,
+ ValueTimestamp BIGINT
+ >,
+ boolx BOOLEAN,
+ Value2 ROW<
+ ValueCase INTEGER ,
+ Value2Value BOOLEAN,
+ ArrayBool2 ROW<`Values` ARRAY>>,
+ booly BOOLEAN
+ >,
+ UaDataType INTEGER,
+ Quality BOOLEAN,
+ `Timestamp` BIGINT,
+ TagInfos MAP,
+ ExValues MAP
+ >
+ >
+
+
+--
+-- , `topic` STRING METADATA VIRTUAL -- from Kafka connector
+-- , `leader-epoch` int METADATA VIRTUAL -- from Kafka connector
+-- , `offset` BIGINT METADATA VIRTUAL -- from Kafka connector
+-- , ts TIMESTAMP(3) METADATA FROM 'timestamp' -- from Kafka connector
+-- , `timestamp-type` STRING METADATA VIRTUAL -- from Kafka connector
+-- , partition_id BIGINT METADATA FROM 'partition' VIRTUAL -- from Kafka connector
+
+) WITH (
+ 'connector' = 'kafka-x'
+ ,'topic' = 'liuliu_proto_sink'
+ ,'properties.bootstrap.servers' = 'flink01:9092'
+ ,'properties.group.id' = 'luna_g'
+ ,'scan.startup.mode' = 'earliest-offset'
+ ,'format' = 'protobuf-x'
+-- ,'protobuf-x.class.name'='ZPMC.Message.MessageGroupOuterClass'
+-- ,'protobuf-x.message.name'='MessageGroup'
+ ,'protobuf-x.message-class-name' = 'ZPMC.Message.MessageGroupOuterClass$MessageGroup'
+ ,'scan.parallelism' = '1'
+ );
+
+INSERT INTO writer
+SELECT *
+from reader
+
diff --git a/chunjun-formats/chunjun-protobuf/pom.xml b/chunjun-formats/chunjun-protobuf/pom.xml
new file mode 100644
index 0000000000..d1cf68cc89
--- /dev/null
+++ b/chunjun-formats/chunjun-protobuf/pom.xml
@@ -0,0 +1,91 @@
+
+
+
+ chunjun-formats
+ com.dtstack.chunjun
+ 1.12-SNAPSHOT
+
+ 4.0.0
+
+ flinkx-protobuf
+
+
+ 8
+ 8
+
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.20.1-rc-1
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.1.0
+
+
+ package
+
+ shade
+
+
+
+
+ org.slf4j:slf4j-api
+ log4j:log4j
+ ch.qos.logback:*
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+ maven-antrun-plugin
+
+
+ copy-resources
+
+ package
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbFormatFactory.java b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbFormatFactory.java
new file mode 100644
index 0000000000..4d3cecc17b
--- /dev/null
+++ b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbFormatFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.format.protobuf;
+
+import com.dtstack.chunjun.format.protobuf.deserialize.PbRowDataDeserializationSchema;
+import com.dtstack.chunjun.format.protobuf.serialize.PbRowDataSerializationSchema;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** @author liuliu 2022/4/8 */
+public class PbFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
+
+ public static final String IDENTIFIER = "protobuf-x";
+
+ @Override
+ public DecodingFormat> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+ String messageClassName = formatOptions.get(PbFormatOptions.MESSAGE_CLASS_NAME);
+ return new DecodingFormat>() {
+ @Override
+ public DeserializationSchema createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType physicalDataType) {
+ final TypeInformation rowDataTypeInfo =
+ context.createTypeInformation(physicalDataType);
+ final RowType rowType = (RowType) physicalDataType.getLogicalType();
+ return new PbRowDataDeserializationSchema(
+ rowType, rowDataTypeInfo, messageClassName);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ };
+ }
+
+ @Override
+ public EncodingFormat> createEncodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+ String messageClassName = formatOptions.get(PbFormatOptions.MESSAGE_CLASS_NAME);
+ return new EncodingFormat>() {
+ @Override
+ public SerializationSchema createRuntimeEncoder(
+ DynamicTableSink.Context context, DataType physicalDataType) {
+ final RowType rowType = (RowType) physicalDataType.getLogicalType();
+ return new PbRowDataSerializationSchema(rowType, messageClassName);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ };
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ Set> options = new HashSet<>();
+ options.add(PbFormatOptions.MESSAGE_CLASS_NAME);
+ return options;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return new HashSet<>();
+ }
+}
diff --git a/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbFormatOptions.java b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbFormatOptions.java
new file mode 100644
index 0000000000..5c9f93c994
--- /dev/null
+++ b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbFormatOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.format.protobuf;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** @author liuliu 2022/4/18 */
+public class PbFormatOptions {
+ private PbFormatOptions() {}
+
+ public static final ConfigOption MESSAGE_CLASS_NAME =
+ ConfigOptions.key("message-class-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Required option to specify the full name of protobuf message class. The protobuf class "
+ + "must be located in the classpath both in client and task side");
+}
diff --git a/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbFormatType.java b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbFormatType.java
new file mode 100644
index 0000000000..e3e12e2658
--- /dev/null
+++ b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbFormatType.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.format.protobuf;
+
+import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
+import com.dtstack.chunjun.throwable.UnsupportedTypeException;
+
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.Descriptors;
+
+/** @author liuliu 2022/4/11 */
+public enum PbFormatType {
+ MAP,
+ ENUM,
+ ARRAY,
+ MESSAGE,
+ BOOLEAN,
+ INT,
+ LONG,
+ FLOAT,
+ DOUBLE,
+ STRING,
+ BYTE_STRING;
+
+ PbFormatType() {}
+
+ public static PbFormatType getTypeByFieldDescriptor(
+ Descriptors.FieldDescriptor fieldDescriptor) {
+ if (fieldDescriptor.isMapField()) {
+ return MAP;
+ } else if (fieldDescriptor.isRepeated()) {
+ return ARRAY;
+ } else {
+ return getTypeByTypeName(fieldDescriptor.getJavaType().name());
+ }
+ }
+
+ public static PbFormatType getArrayInnerTypeByFieldDescriptor(
+ Descriptors.FieldDescriptor fieldDescriptor) {
+ return getTypeByTypeName(fieldDescriptor.getJavaType().name());
+ }
+
+ public static PbFormatType getProtoTypeForMapKey(Class clazz) {
+ switch (clazz.getName()) {
+ case "java.lang.String":
+ return STRING;
+ case "java.lang.Integer":
+ case "int":
+ return INT;
+ case "java.lang.Long":
+ case "long":
+ return LONG;
+ default:
+ throw new ChunJunRuntimeException(
+ String.format(
+ "Map key expect any scalar type except for floating point types and bytes,but it is %s",
+ clazz.getName()));
+ }
+ }
+
+ public static PbFormatType getProtoTypeForMapValue(Class clazz) {
+ if (com.google.protobuf.ProtocolMessageEnum.class.isAssignableFrom(clazz)) {
+ return ENUM;
+ } else if (AbstractMessage.class.isAssignableFrom(clazz)) {
+ return MESSAGE;
+ }
+ switch (clazz.getName()) {
+ case "java.lang.String":
+ return STRING;
+ case "java.lang.Integer":
+ case "int":
+ return INT;
+ case "java.lang.Double":
+ case "double":
+ return DOUBLE;
+ case "java.lang.Long":
+ case "long":
+ return LONG;
+ case "java.lang.Float":
+ case "float":
+ return FLOAT;
+ case "com.google.protobuf.ByteString":
+ return BYTE_STRING;
+ default:
+ throw new UnsupportedOperationException(clazz.getName());
+ }
+ }
+
+ public static PbFormatType getTypeByTypeName(String typeName) {
+ for (PbFormatType protoType : PbFormatType.values()) {
+ if (typeName.equalsIgnoreCase(protoType.name())) {
+ return protoType;
+ }
+ }
+ throw new UnsupportedTypeException(typeName);
+ }
+}
diff --git a/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbMessageAdaptor.java b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbMessageAdaptor.java
new file mode 100644
index 0000000000..ce328124d2
--- /dev/null
+++ b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/PbMessageAdaptor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.format.protobuf;
+
+import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
+
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.Descriptors;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+
+/** @author liuliu 2022/4/27 */
+public class PbMessageAdaptor {
+ protected final int[] normalToOneof;
+ protected final int[] oneOfLastIndex;
+ protected int size;
+
+ public PbMessageAdaptor(
+ List fieldDescriptorList,
+ List oneofDescriptorList) {
+ size = fieldDescriptorList.size();
+ oneOfLastIndex = new int[oneofDescriptorList.size()];
+ normalToOneof = new int[fieldDescriptorList.size()];
+ Arrays.fill(normalToOneof, -1);
+ for (int i = 0; i < oneofDescriptorList.size(); i++) {
+ Descriptors.OneofDescriptor oneofDescriptor = oneofDescriptorList.get(i);
+ for (Descriptors.FieldDescriptor fieldDescriptor : oneofDescriptor.getFields()) {
+ normalToOneof[fieldDescriptor.getIndex()] = i;
+ oneOfLastIndex[i] = fieldDescriptor.getIndex();
+ size--;
+ }
+ size++;
+ }
+ }
+
+ /** Check whether the protobuf field corresponding to index belongs to a oneof field */
+ public boolean isOneOf(int index) {
+ return normalToOneof[index] != -1;
+ }
+
+ /**
+ * If the protobuf field corresponding to index belongs to a oneof field, then return the index
+ * of the last field of the oneof
+ */
+ public Integer getLastOneOfIndex(int index) {
+ int oneofIndex = normalToOneof[index];
+ if (oneofIndex != -1) {
+ return oneOfLastIndex[oneofIndex];
+ }
+ throw new ChunJunRuntimeException("");
+ }
+
+ public Method obtainMethod(String methodName, Class clazz) {
+ try {
+ Method method =
+ Arrays.stream(clazz.getDeclaredMethods())
+ .filter(m -> m.getName().equalsIgnoreCase(methodName))
+ .filter(
+ m ->
+ m.getParameterTypes().length == 0
+ || !AbstractMessage.Builder.class
+ .isAssignableFrom(
+ m.getParameterTypes()[0]))
+ .toArray(Method[]::new)[0];
+ method.setAccessible(true);
+ return method;
+ } catch (Exception e) {
+ throw new ChunJunRuntimeException(
+ String.format(
+ "failed to obtain getMethod[%s] from class[%s]",
+ methodName, clazz.getName()));
+ }
+ }
+
+ public int getSize() {
+ return size;
+ }
+}
diff --git a/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbMessageGetter.java b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbMessageGetter.java
new file mode 100644
index 0000000000..d5a5f61cfe
--- /dev/null
+++ b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbMessageGetter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.format.protobuf.deserialize;
+
+import com.dtstack.chunjun.format.protobuf.PbMessageAdaptor;
+import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
+
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.Descriptors;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Reflect all GET methods in the order of the fields in the protobuf message, then we can get the
+ * values of all the fields in the Message object by index.
+ *
+ * @author liuliu 2022/4/13
+ */
+public class PbMessageGetter extends PbMessageAdaptor {
+
+ private Method[] normalMethods;
+ private Method[] oneofCaseMethods;
+ private Method[] oneOfNumberMethods;
+
+ public PbMessageGetter(
+ List fieldDescriptorList,
+ List oneofDescriptorList,
+ Class extends AbstractMessage> clazz) {
+ super(fieldDescriptorList, oneofDescriptorList);
+ initMethods(fieldDescriptorList, oneofDescriptorList, clazz);
+ }
+
+ /**
+ * Get the value in the protocolMessage object by fieldDescriptor index
+ *
+ * @param object protocolMessage
+ * @param index index of fieldDescriptor
+ */
+ public Object getByIndex(Object object, int index) {
+ try {
+ return normalMethods[index].invoke(object);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new ChunJunRuntimeException(
+ String.format(
+ "failed to get filed from flink type[%s],index[%s]", object, index),
+ e);
+ }
+ }
+
+ /**
+ * If the index is a member of oneof,returns the corresponding index of the valid value in oneof
+ *
+ * @param object protocolMessage
+ * @param index index of fieldDescriptor
+ */
+ public Integer getOneofCase(Object object, int index)
+ throws InvocationTargetException, IllegalAccessException {
+ int oneofIndex = normalToOneof[index];
+ return (Integer)
+ oneOfNumberMethods[oneofIndex].invoke(oneofCaseMethods[oneofIndex].invoke(object));
+ }
+
+ public void initMethods(
+ List fieldDescriptorList,
+ List oneofDescriptorList,
+ Class extends AbstractMessage> clazz) {
+ this.normalMethods =
+ fieldDescriptorList.stream()
+ .map(fieldDescriptor -> obtainNormalGetMethod(fieldDescriptor, clazz))
+ .toArray(Method[]::new);
+ this.oneofCaseMethods =
+ oneofDescriptorList.stream()
+ .map(oneofDescriptor -> obtainOneofCaseMethod(oneofDescriptor, clazz))
+ .toArray(Method[]::new);
+ this.oneOfNumberMethods =
+ oneofDescriptorList.stream()
+ .map(oneofDescriptor -> obtainOneofNumberMethod(oneofDescriptor, clazz))
+ .toArray(Method[]::new);
+ }
+
+ public Method obtainOneofNumberMethod(
+ Descriptors.OneofDescriptor oneofDescriptor, Class extends AbstractMessage> clazz) {
+ Class caseClass;
+ try {
+ caseClass = Class.forName(clazz.getName() + "$" + oneofDescriptor.getName() + "Case");
+ } catch (ClassNotFoundException e) {
+ throw new ChunJunRuntimeException(
+ String.format(
+ "failed to get OneOfCase Class,caseName=%s",
+ oneofDescriptor.getName()));
+ }
+ return obtainMethod("getNumber", caseClass);
+ }
+
+ public Method obtainOneofCaseMethod(
+ Descriptors.OneofDescriptor oneofDescriptor, Class extends AbstractMessage> clazz) {
+ return obtainMethod("get" + oneofDescriptor.getName() + "Case", clazz);
+ }
+
+ /**
+ * obtain get Method . methodName = get+descriptorName. if repeated,methodName =
+ * get+descriptorName+List
+ */
+ public Method obtainNormalGetMethod(
+ Descriptors.FieldDescriptor fieldDescriptor, Class extends AbstractMessage> clazz) {
+ StringBuilder stringBuilder = new StringBuilder("get");
+ stringBuilder.append(fieldDescriptor.getName());
+ if (!fieldDescriptor.isMapField() && fieldDescriptor.isRepeated()) {
+ stringBuilder.append("List");
+ }
+ return obtainMethod(stringBuilder.toString(), clazz);
+ }
+}
diff --git a/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbParser.java b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbParser.java
new file mode 100644
index 0000000000..1a4f3c594a
--- /dev/null
+++ b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbParser.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.format.protobuf.deserialize;
+
+import com.dtstack.chunjun.format.protobuf.util.PbReflectUtil;
+import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
+
+import java.lang.reflect.Method;
+
+/**
+ * trans byte[] to protobuf object by protobuf parseFrom method
+ *
+ * @author liuliu 2022/4/8
+ */
+public class PbParser {
+
+ private Class messageClass;
+ private Method parseMethod;
+
+ public PbParser(String messageClassName) throws NoSuchMethodException {
+ messageClass = PbReflectUtil.getClassByClassName(messageClassName);
+ parseMethod = messageClass.getMethod("parseFrom", byte[].class);
+ parseMethod.setAccessible(true);
+ }
+
+ public Object parse(byte[] bytes) {
+ try {
+ return parseMethod.invoke(messageClass, bytes);
+ } catch (Exception e) {
+ throw new ChunJunRuntimeException(
+ String.format(
+ "Failed to deserialize protocol data from byte[] to message object,messageClass=%s",
+ messageClass),
+ e);
+ }
+ }
+}
diff --git a/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbRowDataDeserializationSchema.java b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbRowDataDeserializationSchema.java
new file mode 100644
index 0000000000..aea96a9b6d
--- /dev/null
+++ b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbRowDataDeserializationSchema.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.format.protobuf.deserialize;
+
+import com.dtstack.chunjun.format.protobuf.util.FormatCheckUtil;
+import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Objects;
+
+/** @author liuliu 2022/4/8 */
+public class PbRowDataDeserializationSchema implements DeserializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ private PbParser protoMessageTransformer;
+ private PbToRowDataPbConverter.ProtoToRowDataConverter runtimeConverter;
+ private TypeInformation typeInformation;
+ private String messageClassName;
+ private RowType rowType;
+
+ public PbRowDataDeserializationSchema(
+ RowType rowType, TypeInformation typeInformation, String protoOutClassName) {
+ this.rowType = rowType;
+ this.typeInformation = typeInformation;
+ this.messageClassName = protoOutClassName;
+ new FormatCheckUtil(rowType, messageClassName).isValid();
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ protoMessageTransformer = new PbParser(messageClassName);
+ runtimeConverter = PbToRowDataPbConverter.initMessageDataConverter(messageClassName);
+ }
+
+ @Override
+ public RowData deserialize(byte[] message) {
+ if (message == null) {
+ return null;
+ }
+ Object protoMessage = protoMessageTransformer.parse(message);
+ try {
+ return (RowData) runtimeConverter.convert(protoMessage);
+ } catch (Exception e) {
+ throw new ChunJunRuntimeException("Failed to convert protobuf record to RowData", e);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return typeInformation;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PbRowDataDeserializationSchema that = (PbRowDataDeserializationSchema) o;
+ return messageClassName.equals(that.messageClassName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(messageClassName);
+ }
+}
diff --git a/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbToRowDataPbConverter.java b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbToRowDataPbConverter.java
new file mode 100644
index 0000000000..6076551c76
--- /dev/null
+++ b/chunjun-formats/chunjun-protobuf/src/main/java/com/dtstack/chunjun/format/protobuf/deserialize/PbToRowDataPbConverter.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.format.protobuf.deserialize;
+
+import com.dtstack.chunjun.format.protobuf.PbFormatType;
+import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.dtstack.chunjun.format.protobuf.util.PbReflectUtil.getClassByFieldDescriptor;
+import static com.dtstack.chunjun.format.protobuf.util.PbReflectUtil.getDescriptorByMessageClassName;
+import static com.dtstack.chunjun.format.protobuf.util.PbReflectUtil.getMapTypeTuple;
+
+/** @author liuliu 2022/4/11 Converte from proto to RowData */
+public class PbToRowDataPbConverter {
+
+ /** proto class full name example as com.dtstack.messageOutClass */
+ protected static String PROTO_CLASS_NAME;
+ /** proto class package name example as com.dtstack */
+ protected static String PROTO_PACKAGE_NAME;
+
+ public interface ProtoToRowDataConverter extends Serializable {
+ Object convert(Object object) throws InvocationTargetException, IllegalAccessException;
+ }
+
+ /**
+ * create a runtime converter for row by specific protoMessageClassName and init {@link
+ * PbToRowDataPbConverter#PROTO_PACKAGE_NAME} and {@link
+ * PbToRowDataPbConverter#PROTO_CLASS_NAME}
+ */
+ public static ProtoToRowDataConverter initMessageDataConverter(String messageClassName) {
+ try {
+ PROTO_CLASS_NAME = messageClassName.substring(0, messageClassName.lastIndexOf("$"));
+ PROTO_PACKAGE_NAME = PROTO_CLASS_NAME.substring(0, PROTO_CLASS_NAME.lastIndexOf("."));
+ return createMessageDataConverter(messageClassName);
+ } catch (IndexOutOfBoundsException e) {
+ throw new ChunJunRuntimeException(
+ String.format("Incorrect proto message class name:%s", messageClassName), e);
+ }
+ }
+
+ /**
+ * create a runtime converter for row by specific messageClassName example as
+ * com.dtstack.MessageOutClass$Message
+ */
+ public static ProtoToRowDataConverter createMessageDataConverter(String protoMessageClassName) {
+ Tuple2>
+ descriptorByMessageClassName =
+ getDescriptorByMessageClassName(protoMessageClassName);
+
+ return createMessageDataConverter(
+ descriptorByMessageClassName.f0.getFields(),
+ descriptorByMessageClassName.f0.getOneofs(),
+ descriptorByMessageClassName.f1);
+ }
+
+ /** create a runtime converter for row by specific fieldDescriptor */
+ public static ProtoToRowDataConverter createMessageDataConverter(
+ Descriptors.FieldDescriptor fieldDescriptor) {
+
+ Descriptors.Descriptor messageType = fieldDescriptor.getMessageType();
+ return createMessageDataConverter(
+ messageType.getFields(),
+ messageType.getOneofs(),
+ getClassByFieldDescriptor(
+ PROTO_CLASS_NAME, PROTO_PACKAGE_NAME, fieldDescriptor, null));
+ }
+
+ public static ProtoToRowDataConverter createMessageDataConverter(
+ List fieldDescriptorList,
+ List oneofDescriptorList,
+ Class extends AbstractMessage> clazz) {
+ PbMessageGetter pbMessageGetter =
+ new PbMessageGetter(fieldDescriptorList, oneofDescriptorList, clazz);
+ ProtoToRowDataConverter[] protoToRowDataConverters =
+ fieldDescriptorList.stream()
+ .map(PbToRowDataPbConverter::createNullableConverter)
+ .toArray(ProtoToRowDataConverter[]::new);
+ return object -> {
+ int size = pbMessageGetter.getSize();
+ GenericRowData genericRowData = new GenericRowData(size);
+ for (int i = 0, index = 0; i < protoToRowDataConverters.length && index < size; ) {
+ if (pbMessageGetter.isOneOf(i)) {
+ int lastIndex = pbMessageGetter.getLastOneOfIndex(i);
+ GenericRowData oneofGenericRowData = new GenericRowData(lastIndex - i + 2);
+ oneofGenericRowData.setField(0, pbMessageGetter.getOneofCase(object, i));
+ int currentCase = 1;
+ while (currentCase < oneofGenericRowData.getArity()) {
+ Object o = pbMessageGetter.getByIndex(object, i);
+ oneofGenericRowData.setField(
+ currentCase++, protoToRowDataConverters[i++].convert(o));
+ }
+ genericRowData.setField(index++, oneofGenericRowData);
+ } else {
+ Object o = pbMessageGetter.getByIndex(object, i);
+ genericRowData.setField(index++, protoToRowDataConverters[i++].convert(o));
+ }
+ }
+ return genericRowData;
+ };
+ }
+
+ /** Creates a runtime converter which is null safe. */
+ private static ProtoToRowDataConverter createNullableConverter(
+ Descriptors.FieldDescriptor descriptor) {
+ return createNullableConverter(
+ descriptor, PbFormatType.getTypeByFieldDescriptor(descriptor));
+ }
+
+ private static ProtoToRowDataConverter createNullableConverter(
+ Descriptors.FieldDescriptor descriptor, PbFormatType protoType) {
+ final ProtoToRowDataConverter converter = createConverter(descriptor, protoType);
+ return protoObject -> {
+ if (protoObject == null) {
+ return null;
+ }
+ return converter.convert(protoObject);
+ };
+ }
+
+ /**
+ * In standard ProtocolBuffers, mapKey can be any integral or string type (so, any scalar type
+ * except for floating point types and bytes), mapValue not can be mapType and arrayType
+ *
+ * @return a runtime map converter
+ */
+ public static ProtoToRowDataConverter createMapConverter(
+ Descriptors.FieldDescriptor fieldDescriptor) {
+ // get builder method
+ Tuple4 mapTypeTuple =
+ getMapTypeTuple(PROTO_CLASS_NAME, PROTO_PACKAGE_NAME, fieldDescriptor);
+ // create keyConverter and valueConverter
+ final ProtoToRowDataConverter keyConverter = createNullableConverter(null, mapTypeTuple.f0);
+ final ProtoToRowDataConverter valueConverter;
+ if (mapTypeTuple.f1 == PbFormatType.MESSAGE) {
+ valueConverter = createMessageDataConverter(mapTypeTuple.f3.getName());
+ } else {
+ // scalar type/enum
+ valueConverter = createConverter(null, mapTypeTuple.f1);
+ }
+ // create runtime map converter
+ return object -> {
+ final Map, ?> map = (Map, ?>) object;
+ Map