From 7b25ec2ef5e9e3c16b7b309a2b672ede18c9de6b Mon Sep 17 00:00:00 2001 From: FlechazoW Date: Thu, 7 Jul 2022 14:49:26 +0800 Subject: [PATCH] [hotfix-#1029][redis] Redis Lookup plugin will throw an error when query fractional field not all fields in redis hash data. --- .../redis/converter/RedisColumnConverter.java | 2 +- .../redis/converter/RedisRowConverter.java | 10 ++++++---- .../connector/redis/enums/RedisDataMode.java | 14 ++++++++------ .../connector/redis/enums/RedisDataType.java | 2 +- .../redis/lookup/RedisAllTableFunction.java | 13 ++++++------- 5 files changed, 22 insertions(+), 19 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/converter/RedisColumnConverter.java b/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/converter/RedisColumnConverter.java index 27eda89854..715bcf0fde 100644 --- a/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/converter/RedisColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/converter/RedisColumnConverter.java @@ -166,7 +166,7 @@ private String[] getValues(ColumnRowData row) { } } - return values.toArray(new String[values.size()]); + return values.toArray(new String[0]); } private String concatValues(ColumnRowData row) { diff --git a/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/converter/RedisRowConverter.java b/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/converter/RedisRowConverter.java index 49d0f83c3e..8190da86b5 100644 --- a/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/converter/RedisRowConverter.java +++ b/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/converter/RedisRowConverter.java @@ -118,10 +118,12 @@ private GenericRowData getGenericRowData(Map input) throws Excep typeIndexList.stream() .filter(x -> x.first.equals(key)) .collect(Collectors.toList()); - Triplet typeTriplet = collect.get(0); - genericRowData.setField( - typeTriplet.second, - toInternalConverters.get(typeTriplet.second).deserialize(input.get(key))); + if (!collect.isEmpty()) { + Triplet typeTriplet = collect.get(0); + genericRowData.setField( + typeTriplet.second, + toInternalConverters.get(typeTriplet.second).deserialize(input.get(key))); + } } return genericRowData; diff --git a/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/enums/RedisDataMode.java b/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/enums/RedisDataMode.java index 4e3cdfe127..946fb5ce86 100644 --- a/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/enums/RedisDataMode.java +++ b/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/enums/RedisDataMode.java @@ -26,6 +26,7 @@ public enum RedisDataMode { /** reader mode */ + H_GET("hget"), /** write mode */ SET("set"), @@ -33,18 +34,15 @@ public enum RedisDataMode { R_PUSH("rpush"), S_ADD("sadd"), Z_ADD("zadd"), - H_SET("hset"); + H_SET("hset"), + ; - public String mode; + public final String mode; RedisDataMode(String mode) { this.mode = mode; } - public String getMode() { - return mode; - } - public static RedisDataMode getDataMode(String mode) { for (RedisDataMode redisDataMode : RedisDataMode.values()) { if (redisDataMode.getMode().equals(mode)) { @@ -54,4 +52,8 @@ public static RedisDataMode getDataMode(String mode) { throw new RuntimeException("Unsupported redis data mode:" + mode); } + + public String getMode() { + return mode; + } } diff --git a/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/enums/RedisDataType.java b/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/enums/RedisDataType.java index ef474f1d49..385c49c877 100644 --- a/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/enums/RedisDataType.java +++ b/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/enums/RedisDataType.java @@ -36,7 +36,7 @@ public enum RedisDataType { HASH("hash"); - public String type; + public final String type; RedisDataType(String type) { this.type = type; diff --git a/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/lookup/RedisAllTableFunction.java b/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/lookup/RedisAllTableFunction.java index 0fe52ed6f6..5210f639aa 100644 --- a/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/lookup/RedisAllTableFunction.java +++ b/chunjun-connectors/chunjun-connector-redis/src/main/java/com/dtstack/chunjun/connector/redis/lookup/RedisAllTableFunction.java @@ -53,8 +53,8 @@ public class RedisAllTableFunction extends AbstractAllTableFunction { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RedisAllTableFunction.class); + private final RedisConf redisConf; private transient RedisSyncClient redisSyncClient; - private RedisConf redisConf; public RedisAllTableFunction( RedisConf redisConf, @@ -68,13 +68,12 @@ public RedisAllTableFunction( @Override public void eval(Object... keys) { - StringBuilder keyPattern = new StringBuilder(redisConf.getTableName()); - keyPattern - .append("_") - .append(Arrays.stream(keys).map(String::valueOf).collect(Collectors.joining("_"))); + String keyPattern = + redisConf.getTableName() + + "_" + + Arrays.stream(keys).map(String::valueOf).collect(Collectors.joining("_")); List> cacheList = - ((Map>>) cacheRef.get()) - .get(keyPattern.toString()); + ((Map>>) cacheRef.get()).get(keyPattern); // 有数据才往下发,(左/内)连接flink会做相应的处理 if (!CollectionUtils.isEmpty(cacheList)) {