Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.dtstack.chunjun.element.column.BytesColumn;
import com.dtstack.chunjun.element.column.NullColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.element.column.TimestampColumn;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
Expand All @@ -57,6 +58,8 @@
public class InfluxdbColumnConverter
extends AbstractRowConverter<Map<String, Object>, RowData, Point.Builder, LogicalType> {

private static final String TIME_KEY = "time";

private String format = "MSGPACK";
private List<String> fieldNameList;
private List<FieldConf> fieldConfList;
Expand All @@ -72,7 +75,8 @@ public InfluxdbColumnConverter(
RowType rowType,
ChunJunCommonConf commonConf,
List<String> fieldNameList,
String format) {
String format,
TimeUnit precision) {
super(rowType, commonConf);
for (int i = 0; i < rowType.getFieldCount(); i++) {
toInternalConverters.add(
Expand All @@ -85,6 +89,7 @@ public InfluxdbColumnConverter(
this.format = format;
this.fieldConfList = commonConf.getColumn();
this.fieldNameList = fieldNameList;
this.precision = precision;
}

public InfluxdbColumnConverter(
Expand Down Expand Up @@ -124,33 +129,49 @@ protected ISerializationConverter<Point.Builder> wrapIntoNullableExternalConvert

@Override
public RowData toInternal(Map<String, Object> input) throws Exception {

int converterIndex = 0;
if (fieldConfList.size() == 1
&& StringUtils.equals(ConstantValue.STAR_SYMBOL, fieldConfList.get(0).getName())) {
ColumnRowData result = new ColumnRowData(fieldNameList.size());
for (int i = 0; i < fieldNameList.size(); i++) {
Object field = input.get(fieldNameList.get(i));
AbstractBaseColumn baseColumn =
(AbstractBaseColumn) toInternalConverters.get(i).deserialize(field);
for (String fieldName : fieldNameList) {
AbstractBaseColumn baseColumn = setValue(input, fieldName, converterIndex);
result.addField(baseColumn);
converterIndex++;
}
return result;
}

ColumnRowData result = new ColumnRowData(fieldConfList.size());
int converterIndex = 0;
for (FieldConf fieldConf : fieldConfList) {
AbstractBaseColumn baseColumn = null;
if (StringUtils.isBlank(fieldConf.getValue())) {
Object field = input.get(fieldConf.getName());
baseColumn =
(AbstractBaseColumn)
toInternalConverters.get(converterIndex).deserialize(field);
} else {
ColumnRowData result = new ColumnRowData(fieldConfList.size());
for (FieldConf fieldConf : fieldConfList) {
String fieldName = fieldConf.getName();
AbstractBaseColumn baseColumn = setValue(input, fieldName, converterIndex);
result.addField(assembleFieldProps(fieldConf, baseColumn));
converterIndex++;
}
result.addField(assembleFieldProps(fieldConf, baseColumn));
return result;
}
}

/**
* Set the value of input into column.
*
* @param input input value.
* @param fieldName field name of input.
* @param index index of converter.
* @return column
* @throws Exception the exception from converter.
*/
private AbstractBaseColumn setValue(Map<String, Object> input, String fieldName, int index)
throws Exception {
AbstractBaseColumn baseColumn;
if (TIME_KEY.equalsIgnoreCase(fieldName)) {
Long timeLong = (Long) input.get(fieldName);
long timeMs = TimeUnit.MILLISECONDS.convert(timeLong, precision);
baseColumn = new TimestampColumn(timeMs);
} else {
Object field = input.get(fieldName);
baseColumn = (AbstractBaseColumn) toInternalConverters.get(index).deserialize(field);
}
return result;
return baseColumn;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.dtstack.chunjun.connector.influxdb.conf.InfluxdbSourceConfig;
import com.dtstack.chunjun.connector.influxdb.converter.InfluxdbColumnConverter;
import com.dtstack.chunjun.connector.influxdb.converter.InfluxdbRawTypeConverter;
import com.dtstack.chunjun.connector.influxdb.enums.TimePrecisionEnums;
import com.dtstack.chunjun.source.format.BaseRichInputFormat;
import com.dtstack.chunjun.throwable.ReadRecordException;
import com.dtstack.chunjun.util.ColumnBuildUtil;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -74,6 +76,7 @@ public class InfluxdbInputFormat extends BaseRichInputFormat {

private InfluxdbSourceConfig config;
private String queryTemplate;
private TimeUnit precision;
private transient InfluxDB influxDB;
private transient AtomicBoolean hasNext;
private transient BlockingQueue<Map<String, Object>> queue;
Expand All @@ -93,6 +96,8 @@ protected void openInternal(InputSplit inputSplit) throws IOException {
LOG.info("subTask[{}] inputSplit = {}.", indexOfSubTask, inputSplit);
this.queue = new LinkedBlockingQueue<>(config.getFetchSize() * 3);
this.hasNext = new AtomicBoolean(true);
this.precision = TimePrecisionEnums.of(config.getEpoch()).getPrecision();

connect();

Pair<List<String>, List<String>> pair = getTableMetadata();
Expand All @@ -107,7 +112,8 @@ protected void openInternal(InputSplit inputSplit) throws IOException {

// TODO add InfluxdbRawConverter
setRowConverter(
new InfluxdbColumnConverter(rowType, config, columnNameList, config.getFormat()));
new InfluxdbColumnConverter(
rowType, config, columnNameList, config.getFormat(), precision));

this.queryInfluxQLBuilder = new InfluxdbQuerySqlBuilder(config, columnNameList);
this.queryTemplate = queryInfluxQLBuilder.buildSql();
Expand All @@ -133,6 +139,11 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException
RowData row;
try {
Map<String, Object> data = queue.poll(5, TimeUnit.SECONDS);

if (Objects.isNull(data)) {
return null;
}

row = rowConverter.toInternal(data);
} catch (Exception e) {
throw new ReadRecordException("can not read next record.", e, -1, rowData);
Expand Down