From 97a5cc3369da7db2184eb87be976ca870dc887ff Mon Sep 17 00:00:00 2001 From: junfenglin <905715760@qq.com> Date: Fri, 21 Jan 2022 22:25:09 +0800 Subject: [PATCH] =?UTF-8?q?[fix][connectors=20redis]redis=20sink=20values?= =?UTF-8?q?=E4=BF=9D=E7=95=99=E5=AD=97=E6=AE=B5=E7=B1=BB=E5=9E=8B=E6=96=B9?= =?UTF-8?q?=E4=BE=BF=E5=8F=8D=E5=BA=8F=E5=88=97=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../redis/converter/RedisColumnConverter.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisColumnConverter.java b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisColumnConverter.java index 5dcd686fcb..45d00d2f0e 100644 --- a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisColumnConverter.java @@ -18,6 +18,7 @@ package com.dtstack.flinkx.connector.redis.converter; +import com.dtstack.flinkx.conf.FieldConf; import com.dtstack.flinkx.connector.redis.conf.RedisConf; import com.dtstack.flinkx.connector.redis.enums.RedisDataMode; import com.dtstack.flinkx.connector.redis.enums.RedisDataType; @@ -25,6 +26,7 @@ import com.dtstack.flinkx.element.ColumnRowData; import com.dtstack.flinkx.element.column.StringColumn; import com.dtstack.flinkx.element.column.TimestampColumn; +import com.dtstack.flinkx.util.JsonUtil; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; @@ -34,7 +36,10 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDIS_CRITICAL_TIME; import static com.dtstack.flinkx.connector.redis.options.RedisOptions.REDIS_KEY_VALUE_SIZE; @@ -141,7 +146,17 @@ private String[] getValues(ColumnRowData row) { } private String concatValues(ColumnRowData row) { - return StringUtils.join(getValues(row), redisConf.getValueFieldDelimiter()); + List columns = redisConf.getColumn(); + Map fieldMap = new HashMap<>(); + int index = 0; + + for (FieldConf fieldConf : columns) { + if (Objects.nonNull(row.getField(index))) { + fieldMap.put(fieldConf.getName(), row.getField(index).getData()); + } + index++; + } + return JsonUtil.toJson(fieldMap); } private String concatKey(ColumnRowData row) {