From 3dd76b85634ea0043be602015dc4ac44d8e3cc33 Mon Sep 17 00:00:00 2001 From: FlechazoW Date: Sun, 17 Jul 2022 16:58:41 +0800 Subject: [PATCH] [hotfix-#1063][kafka] Fix sync Task of kafka throws npe. --- .../connector/kafka/converter/KafkaColumnConverter.java | 4 ---- .../main/java/com/dtstack/chunjun/source/SourceFactory.java | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaColumnConverter.java b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaColumnConverter.java index 4c46e61909..3edaa3e5e7 100644 --- a/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaColumnConverter.java @@ -70,8 +70,6 @@ public class KafkaColumnConverter extends AbstractRowConverter keyTypeList) { this.kafkaConf = kafkaConf; this.outList = keyTypeList; - this.jsonDecoder = new JsonDecoder(); if (DEFAULT_CODEC.defaultValue().equals(kafkaConf.getCodec())) { this.decode = new JsonDecoder(); } else { @@ -90,7 +87,6 @@ public KafkaColumnConverter(KafkaConf kafkaConf, List keyTypeList) { public KafkaColumnConverter(KafkaConf kafkaConf) { this.commonConf = this.kafkaConf = kafkaConf; - this.jsonDecoder = new JsonDecoder(); if (DEFAULT_CODEC.defaultValue().equals(kafkaConf.getCodec())) { this.decode = new JsonDecoder(); } else { diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/source/SourceFactory.java b/chunjun-core/src/main/java/com/dtstack/chunjun/source/SourceFactory.java index 1f487a9bf6..9f3142773a 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/source/SourceFactory.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/source/SourceFactory.java @@ -127,7 +127,7 @@ protected DataStream createInput( protected DataStream createInput( RichParallelSourceFunction function, String sourceName) { Preconditions.checkNotNull(sourceName); - return env.addSource(function, sourceName, getTypeInformation()); + return env.addSource(function, sourceName); } protected DataStream createInput(InputFormat inputFormat) {