From 2463d1bbecde1a1b1a2f4614320e1ac838bbd487 Mon Sep 17 00:00:00 2001 From: liumengkai <1623857502@qq.com> Date: Sun, 9 Oct 2022 10:40:14 +0800 Subject: [PATCH 1/2] [feat-#1293][chunjun-connector-hdfs] add assembleFieldProps for hdfs connector during column converter --- .../hdfs/converter/HdfsTextColumnConverter.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java index ae00fc3e41..71d53ec6ca 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java @@ -51,8 +51,11 @@ public class HdfsTextColumnConverter extends AbstractRowConverter { + private final List hdfsFieldConfList; + public HdfsTextColumnConverter(List fieldConfList) { super(fieldConfList.size()); + hdfsFieldConfList = fieldConfList; for (int i = 0; i < fieldConfList.size(); i++) { String type = fieldConfList.get(i).getType(); int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL); @@ -74,11 +77,14 @@ public RowData toInternal(RowData input) throws Exception { if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; for (int i = 0; i < input.getArity(); i++) { + FieldConf fieldConf = hdfsFieldConfList.get(i); row.addField( - (AbstractBaseColumn) - toInternalConverters - .get(i) - .deserialize(genericRowData.getField(i))); + assembleFieldProps( + fieldConf, + (AbstractBaseColumn) + toInternalConverters + .get(i) + .deserialize(genericRowData.getField(i)))); } } else { throw new ChunJunRuntimeException( From ea2be4df585d3e7f60bff2511dedf00e4a74473f Mon Sep 17 00:00:00 2001 From: liumengkai <1623857502@qq.com> Date: Sun, 9 Oct 2022 11:51:59 +0800 Subject: [PATCH 2/2] [feat-#1293][chunjun-connector-hdfs] add assembleFieldProps for hdfs connector during column converter orc&parquet&text --- .../hdfs/converter/HdfsOrcColumnConverter.java | 16 ++++++++++------ .../converter/HdfsParquetColumnConverter.java | 16 ++++++++++------ .../hdfs/converter/HdfsTextColumnConverter.java | 12 +++++------- .../connector/hdfs/sink/HdfsSinkFactory.java | 3 ++- .../connector/hdfs/source/HdfsSourceFactory.java | 3 ++- .../chunjun/connector/hdfs/util/HdfsUtil.java | 10 ++++++---- .../connector/hive/sink/HiveOutputFormat.java | 3 ++- .../chunjun/converter/AbstractRowConverter.java | 7 +++++++ 8 files changed, 44 insertions(+), 26 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsOrcColumnConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsOrcColumnConverter.java index e15140cbe9..a4df99af9a 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsOrcColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsOrcColumnConverter.java @@ -18,6 +18,7 @@ package com.dtstack.chunjun.connector.hdfs.converter; import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; @@ -61,8 +62,8 @@ public class HdfsOrcColumnConverter private List ColumnNameList; private transient Map decimalColInfo; - public HdfsOrcColumnConverter(List fieldConfList) { - super(fieldConfList.size()); + public HdfsOrcColumnConverter(List fieldConfList, HdfsConf hdfsConf) { + super(fieldConfList.size(), hdfsConf); for (int i = 0; i < fieldConfList.size(); i++) { String type = fieldConfList.get(i).getType(); int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL); @@ -83,12 +84,15 @@ public RowData toInternal(RowData input) throws Exception { ColumnRowData row = new ColumnRowData(input.getArity()); if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; + List fieldConfList = commonConf.getColumn(); for (int i = 0; i < input.getArity(); i++) { row.addField( - (AbstractBaseColumn) - toInternalConverters - .get(i) - .deserialize(genericRowData.getField(i))); + assembleFieldProps( + fieldConfList.get(i), + (AbstractBaseColumn) + toInternalConverters + .get(i) + .deserialize(genericRowData.getField(i)))); } } else { throw new ChunJunRuntimeException( diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetColumnConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetColumnConverter.java index f5d5bdac4f..63f54be4ff 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsParquetColumnConverter.java @@ -18,6 +18,7 @@ package com.dtstack.chunjun.connector.hdfs.converter; import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf; import com.dtstack.chunjun.connector.hdfs.util.HdfsUtil; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.converter.AbstractRowConverter; @@ -64,8 +65,8 @@ public class HdfsParquetColumnConverter private List columnNameList; private transient Map decimalColInfo; - public HdfsParquetColumnConverter(List fieldConfList) { - super(fieldConfList.size()); + public HdfsParquetColumnConverter(List fieldConfList, HdfsConf hdfsConf) { + super(fieldConfList.size(), hdfsConf); for (int i = 0; i < fieldConfList.size(); i++) { String type = fieldConfList.get(i).getType(); int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL); @@ -86,12 +87,15 @@ public RowData toInternal(RowData input) throws Exception { ColumnRowData row = new ColumnRowData(input.getArity()); if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; + List fieldConfList = commonConf.getColumn(); for (int i = 0; i < input.getArity(); i++) { row.addField( - (AbstractBaseColumn) - toInternalConverters - .get(i) - .deserialize(genericRowData.getField(i))); + assembleFieldProps( + fieldConfList.get(i), + (AbstractBaseColumn) + toInternalConverters + .get(i) + .deserialize(genericRowData.getField(i)))); } } else { throw new ChunJunRuntimeException( diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java index 71d53ec6ca..fd016203ab 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/converter/HdfsTextColumnConverter.java @@ -18,6 +18,7 @@ package com.dtstack.chunjun.connector.hdfs.converter; import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf; import com.dtstack.chunjun.constants.ConstantValue; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; @@ -51,11 +52,8 @@ public class HdfsTextColumnConverter extends AbstractRowConverter { - private final List hdfsFieldConfList; - - public HdfsTextColumnConverter(List fieldConfList) { - super(fieldConfList.size()); - hdfsFieldConfList = fieldConfList; + public HdfsTextColumnConverter(List fieldConfList, HdfsConf hdfsConf) { + super(fieldConfList.size(), hdfsConf); for (int i = 0; i < fieldConfList.size(); i++) { String type = fieldConfList.get(i).getType(); int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL); @@ -76,11 +74,11 @@ public RowData toInternal(RowData input) throws Exception { ColumnRowData row = new ColumnRowData(input.getArity()); if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; + List fieldConfList = commonConf.getColumn(); for (int i = 0; i < input.getArity(); i++) { - FieldConf fieldConf = hdfsFieldConfList.get(i); row.addField( assembleFieldProps( - fieldConf, + fieldConfList.get(i), (AbstractBaseColumn) toInternalConverters .get(i) diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsSinkFactory.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsSinkFactory.java index 8242a951a3..00b3006018 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/sink/HdfsSinkFactory.java @@ -57,7 +57,8 @@ public DataStreamSink createSink(DataStream dataSet) { useAbstractBaseColumn, hdfsConf.getFileType(), hdfsConf.getColumn(), - getRawTypeConverter()); + getRawTypeConverter(), + hdfsConf); builder.setRowConverter(rowConverter, useAbstractBaseColumn); return createOutput(dataSet, builder.finish()); diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsSourceFactory.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsSourceFactory.java index a6dcec67fb..f4aa0b50e1 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsSourceFactory.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/source/HdfsSourceFactory.java @@ -56,7 +56,8 @@ public DataStream createSource() { useAbstractBaseColumn, hdfsConf.getFileType(), hdfsConf.getColumn(), - getRawTypeConverter()); + getRawTypeConverter(), + hdfsConf); builder.setRowConverter(rowConverter, useAbstractBaseColumn); return createInput(builder.finish()); diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/util/HdfsUtil.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/util/HdfsUtil.java index b83d51daab..6757e0a625 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/util/HdfsUtil.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/util/HdfsUtil.java @@ -18,6 +18,7 @@ package com.dtstack.chunjun.connector.hdfs.util; import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.hdfs.conf.HdfsConf; import com.dtstack.chunjun.connector.hdfs.converter.HdfsOrcColumnConverter; import com.dtstack.chunjun.connector.hdfs.converter.HdfsOrcRowConverter; import com.dtstack.chunjun.connector.hdfs.converter.HdfsParquetColumnConverter; @@ -331,18 +332,19 @@ public static AbstractRowConverter createRowConverter( boolean useAbstractBaseColumn, String fileType, List fieldConfList, - RawTypeConverter converter) { + RawTypeConverter converter, + HdfsConf hdfsConf) { AbstractRowConverter rowConverter; if (useAbstractBaseColumn) { switch (FileType.getByName(fileType)) { case ORC: - rowConverter = new HdfsOrcColumnConverter(fieldConfList); + rowConverter = new HdfsOrcColumnConverter(fieldConfList, hdfsConf); break; case PARQUET: - rowConverter = new HdfsParquetColumnConverter(fieldConfList); + rowConverter = new HdfsParquetColumnConverter(fieldConfList, hdfsConf); break; default: - rowConverter = new HdfsTextColumnConverter(fieldConfList); + rowConverter = new HdfsTextColumnConverter(fieldConfList, hdfsConf); } } else { RowType rowType = TableUtil.createRowType(fieldConfList, converter); diff --git a/chunjun-connectors/chunjun-connector-hive/src/main/java/com/dtstack/chunjun/connector/hive/sink/HiveOutputFormat.java b/chunjun-connectors/chunjun-connector-hive/src/main/java/com/dtstack/chunjun/connector/hive/sink/HiveOutputFormat.java index 0bf1034861..11e0092bb4 100644 --- a/chunjun-connectors/chunjun-connector-hive/src/main/java/com/dtstack/chunjun/connector/hive/sink/HiveOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-hive/src/main/java/com/dtstack/chunjun/connector/hive/sink/HiveOutputFormat.java @@ -322,7 +322,8 @@ private BaseHdfsOutputFormat createHdfsOutputFormat( useAbstractBaseColumn, copyHiveConf.getFileType(), fieldConfList, - HdfsRawTypeConverter::apply), + HdfsRawTypeConverter::apply, + hiveConf), useAbstractBaseColumn); builder.setInitAccumulatorAndDirty(false); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java b/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java index 99615653ca..74700fa6fb 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/converter/AbstractRowConverter.java @@ -87,6 +87,12 @@ public AbstractRowConverter(int converterSize) { this.toExternalConverters = new ArrayList<>(converterSize); } + public AbstractRowConverter(int converterSize, ChunJunCommonConf commonConf) { + this.toInternalConverters = new ArrayList<>(converterSize); + this.toExternalConverters = new ArrayList<>(converterSize); + this.commonConf = commonConf; + } + protected IDeserializationConverter wrapIntoNullableInternalConverter( IDeserializationConverter IDeserializationConverter) { return val -> { @@ -187,6 +193,7 @@ protected ISerializationConverter wrapIntoNullableExternalConverter( public RowData toInternalLookup(LookupT input) throws Exception { throw new RuntimeException("Subclass need rewriting"); } + /** * BinaryRowData *