Skip to content

Commit f26b84f

Browse files
committed
fixed issue #3829 , kafka connector support k8s env
1 parent 0a2254e commit f26b84f

File tree

2 files changed

+5
-0
lines changed

2 files changed

+5
-0
lines changed

connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/consumer/CanalKafkaConsumer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.ConcurrentHashMap;
1010
import java.util.concurrent.TimeUnit;
1111

12+
import com.alibaba.otter.canal.common.utils.PropertiesUtils;
1213
import org.apache.kafka.clients.consumer.ConsumerRecord;
1314
import org.apache.kafka.clients.consumer.ConsumerRecords;
1415
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -53,6 +54,8 @@ public void init(Properties properties, String topic, String groupId) {
5354
String k = (String) entry.getKey();
5455
Object v = entry.getValue();
5556
if (k.startsWith(PREFIX_KAFKA_CONFIG) && v != null) {
57+
// check env config
58+
v = PropertiesUtils.getProperty(properties, k);
5659
kafkaProperties.put(k.substring(PREFIX_KAFKA_CONFIG.length()), v);
5760
}
5861
}

connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ private void loadKafkaProperties(Properties properties) {
102102
String key = (String) entry.getKey();
103103
Object value = entry.getValue();
104104
if (key.startsWith(PREFIX_KAFKA_CONFIG) && value != null) {
105+
// check env config
106+
value = PropertiesUtils.getProperty(properties, key);
105107
key = key.substring(PREFIX_KAFKA_CONFIG.length());
106108
kafkaProperties.put(key, value);
107109
}

0 commit comments

Comments
 (0)