diff --git a/chunjun-connectors/chunjun-connector-influxdb/src/main/java/com/dtstack/chunjun/connector/influxdb/converter/InfluxdbColumnConverter.java b/chunjun-connectors/chunjun-connector-influxdb/src/main/java/com/dtstack/chunjun/connector/influxdb/converter/InfluxdbColumnConverter.java index cbec38f092..afc378c274 100644 --- a/chunjun-connectors/chunjun-connector-influxdb/src/main/java/com/dtstack/chunjun/connector/influxdb/converter/InfluxdbColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-influxdb/src/main/java/com/dtstack/chunjun/connector/influxdb/converter/InfluxdbColumnConverter.java @@ -35,6 +35,7 @@ import com.dtstack.chunjun.element.column.BytesColumn; import com.dtstack.chunjun.element.column.NullColumn; import com.dtstack.chunjun.element.column.StringColumn; +import com.dtstack.chunjun.element.column.TimestampColumn; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; @@ -57,6 +58,8 @@ public class InfluxdbColumnConverter extends AbstractRowConverter, RowData, Point.Builder, LogicalType> { + private static final String TIME_KEY = "time"; + private String format = "MSGPACK"; private List fieldNameList; private List fieldConfList; @@ -72,7 +75,8 @@ public InfluxdbColumnConverter( RowType rowType, ChunJunCommonConf commonConf, List fieldNameList, - String format) { + String format, + TimeUnit precision) { super(rowType, commonConf); for (int i = 0; i < rowType.getFieldCount(); i++) { toInternalConverters.add( @@ -85,6 +89,7 @@ public InfluxdbColumnConverter( this.format = format; this.fieldConfList = commonConf.getColumn(); this.fieldNameList = fieldNameList; + this.precision = precision; } public InfluxdbColumnConverter( @@ -124,33 +129,49 @@ protected ISerializationConverter wrapIntoNullableExternalConvert @Override public RowData toInternal(Map input) throws Exception { - + int converterIndex = 0; if (fieldConfList.size() == 1 && StringUtils.equals(ConstantValue.STAR_SYMBOL, fieldConfList.get(0).getName())) { ColumnRowData result = new ColumnRowData(fieldNameList.size()); - for (int i = 0; i < fieldNameList.size(); i++) { - Object field = input.get(fieldNameList.get(i)); - AbstractBaseColumn baseColumn = - (AbstractBaseColumn) toInternalConverters.get(i).deserialize(field); + for (String fieldName : fieldNameList) { + AbstractBaseColumn baseColumn = setValue(input, fieldName, converterIndex); result.addField(baseColumn); + converterIndex++; } return result; - } - - ColumnRowData result = new ColumnRowData(fieldConfList.size()); - int converterIndex = 0; - for (FieldConf fieldConf : fieldConfList) { - AbstractBaseColumn baseColumn = null; - if (StringUtils.isBlank(fieldConf.getValue())) { - Object field = input.get(fieldConf.getName()); - baseColumn = - (AbstractBaseColumn) - toInternalConverters.get(converterIndex).deserialize(field); + } else { + ColumnRowData result = new ColumnRowData(fieldConfList.size()); + for (FieldConf fieldConf : fieldConfList) { + String fieldName = fieldConf.getName(); + AbstractBaseColumn baseColumn = setValue(input, fieldName, converterIndex); + result.addField(assembleFieldProps(fieldConf, baseColumn)); converterIndex++; } - result.addField(assembleFieldProps(fieldConf, baseColumn)); + return result; + } + } + + /** + * Set the value of input into column. + * + * @param input input value. + * @param fieldName field name of input. + * @param index index of converter. + * @return column + * @throws Exception the exception from converter. + */ + private AbstractBaseColumn setValue(Map input, String fieldName, int index) + throws Exception { + AbstractBaseColumn baseColumn; + if (TIME_KEY.equalsIgnoreCase(fieldName)) { + Long timeLong = (Long) input.get(fieldName); + long timeMs = TimeUnit.MILLISECONDS.convert(timeLong, precision); + baseColumn = new TimestampColumn(timeMs); + } else { + Object field = input.get(fieldName); + baseColumn = (AbstractBaseColumn) toInternalConverters.get(index).deserialize(field); } - return result; + return baseColumn; } @Override diff --git a/chunjun-connectors/chunjun-connector-influxdb/src/main/java/com/dtstack/chunjun/connector/influxdb/source/InfluxdbInputFormat.java b/chunjun-connectors/chunjun-connector-influxdb/src/main/java/com/dtstack/chunjun/connector/influxdb/source/InfluxdbInputFormat.java index 74c5acedf5..e550607d3b 100644 --- a/chunjun-connectors/chunjun-connector-influxdb/src/main/java/com/dtstack/chunjun/connector/influxdb/source/InfluxdbInputFormat.java +++ b/chunjun-connectors/chunjun-connector-influxdb/src/main/java/com/dtstack/chunjun/connector/influxdb/source/InfluxdbInputFormat.java @@ -25,6 +25,7 @@ import com.dtstack.chunjun.connector.influxdb.conf.InfluxdbSourceConfig; import com.dtstack.chunjun.connector.influxdb.converter.InfluxdbColumnConverter; import com.dtstack.chunjun.connector.influxdb.converter.InfluxdbRawTypeConverter; +import com.dtstack.chunjun.connector.influxdb.enums.TimePrecisionEnums; import com.dtstack.chunjun.source.format.BaseRichInputFormat; import com.dtstack.chunjun.throwable.ReadRecordException; import com.dtstack.chunjun.util.ColumnBuildUtil; @@ -54,6 +55,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -74,6 +76,7 @@ public class InfluxdbInputFormat extends BaseRichInputFormat { private InfluxdbSourceConfig config; private String queryTemplate; + private TimeUnit precision; private transient InfluxDB influxDB; private transient AtomicBoolean hasNext; private transient BlockingQueue> queue; @@ -93,6 +96,8 @@ protected void openInternal(InputSplit inputSplit) throws IOException { LOG.info("subTask[{}] inputSplit = {}.", indexOfSubTask, inputSplit); this.queue = new LinkedBlockingQueue<>(config.getFetchSize() * 3); this.hasNext = new AtomicBoolean(true); + this.precision = TimePrecisionEnums.of(config.getEpoch()).getPrecision(); + connect(); Pair, List> pair = getTableMetadata(); @@ -107,7 +112,8 @@ protected void openInternal(InputSplit inputSplit) throws IOException { // TODO add InfluxdbRawConverter setRowConverter( - new InfluxdbColumnConverter(rowType, config, columnNameList, config.getFormat())); + new InfluxdbColumnConverter( + rowType, config, columnNameList, config.getFormat(), precision)); this.queryInfluxQLBuilder = new InfluxdbQuerySqlBuilder(config, columnNameList); this.queryTemplate = queryInfluxQLBuilder.buildSql(); @@ -133,6 +139,11 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException RowData row; try { Map data = queue.poll(5, TimeUnit.SECONDS); + + if (Objects.isNull(data)) { + return null; + } + row = rowConverter.toInternal(data); } catch (Exception e) { throw new ReadRecordException("can not read next record.", e, -1, rowData);