Skip to content

Commit 997957d

Browse files
authored
新增配置 canal.mq.enableDynamicQueuePartition,获取topic对应的队列的数量为分区的数量,以支持动态队列的场景(如阿里云的rocketmq,无法人工设置队列数量,可能会动态伸缩) (#3670)
1 parent ea20076 commit 997957d

File tree

8 files changed

+77
-36
lines changed

8 files changed

+77
-36
lines changed

connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQDestination.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class MQDestination {
1515
private String partitionHash;
1616
private String dynamicTopic;
1717
private String dynamicTopicPartitionNum;
18+
private Boolean enableDynamicQueuePartition;
1819

1920
public String getCanalDestination() {
2021
return canalDestination;
@@ -71,4 +72,12 @@ public String getDynamicTopicPartitionNum() {
7172
public void setDynamicTopicPartitionNum(String dynamicTopicPartitionNum) {
7273
this.dynamicTopicPartitionNum = dynamicTopicPartitionNum;
7374
}
75+
76+
public Boolean getEnableDynamicQueuePartition() {
77+
return enableDynamicQueuePartition;
78+
}
79+
80+
public void setEnableDynamicQueuePartition(Boolean enableDynamicQueuePartition) {
81+
this.enableDynamicQueuePartition = enableDynamicQueuePartition;
82+
}
7483
}

connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,5 @@
11
package com.alibaba.otter.canal.connector.rocketmq.producer;
22

3-
import java.util.ArrayList;
4-
import java.util.List;
5-
import java.util.Map;
6-
import java.util.Properties;
7-
import java.util.concurrent.ArrayBlockingQueue;
8-
import java.util.concurrent.ThreadPoolExecutor;
9-
import java.util.concurrent.TimeUnit;
10-
import java.util.stream.Collectors;
11-
12-
import org.apache.commons.lang.StringUtils;
13-
import org.apache.rocketmq.acl.common.AclClientRPCHook;
14-
import org.apache.rocketmq.acl.common.SessionCredentials;
15-
import org.apache.rocketmq.client.AccessChannel;
16-
import org.apache.rocketmq.client.exception.MQClientException;
17-
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
18-
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
19-
import org.apache.rocketmq.client.producer.DefaultMQProducer;
20-
import org.apache.rocketmq.client.producer.SendResult;
21-
import org.apache.rocketmq.common.message.Message;
22-
import org.apache.rocketmq.common.message.MessageQueue;
23-
import org.apache.rocketmq.remoting.RPCHook;
24-
import org.slf4j.Logger;
25-
import org.slf4j.LoggerFactory;
26-
273
import com.alibaba.fastjson.JSON;
284
import com.alibaba.fastjson.serializer.SerializerFeature;
295
import com.alibaba.otter.canal.common.CanalException;
@@ -40,6 +16,29 @@
4016
import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQConstants;
4117
import com.alibaba.otter.canal.connector.rocketmq.config.RocketMQProducerConfig;
4218
import com.alibaba.otter.canal.protocol.FlatMessage;
19+
import org.apache.commons.lang.StringUtils;
20+
import org.apache.rocketmq.acl.common.AclClientRPCHook;
21+
import org.apache.rocketmq.acl.common.SessionCredentials;
22+
import org.apache.rocketmq.client.AccessChannel;
23+
import org.apache.rocketmq.client.exception.MQClientException;
24+
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
25+
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
26+
import org.apache.rocketmq.client.producer.DefaultMQProducer;
27+
import org.apache.rocketmq.client.producer.SendResult;
28+
import org.apache.rocketmq.common.message.Message;
29+
import org.apache.rocketmq.common.message.MessageQueue;
30+
import org.apache.rocketmq.remoting.RPCHook;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Properties;
38+
import java.util.concurrent.ArrayBlockingQueue;
39+
import java.util.concurrent.ThreadPoolExecutor;
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.stream.Collectors;
4342

4443
/**
4544
* RocketMQ Producer SPI 实现
@@ -186,6 +185,12 @@ public void send(final MQDestination destination, String topicName, com.alibaba.
186185
// 获取当前topic的分区数
187186
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
188187
destination.getDynamicTopicPartitionNum());
188+
189+
// 获取topic的队列数为分区数
190+
if(partitionNum == null){
191+
partitionNum = getTopicDynamicQueuesSize(destination.getEnableDynamicQueuePartition(), topicName);
192+
}
193+
189194
if (partitionNum == null) {
190195
partitionNum = destination.getPartitionsNum();
191196
}
@@ -353,4 +358,18 @@ public void stop() {
353358

354359
super.stop();
355360
}
361+
362+
private Integer getTopicDynamicQueuesSize(Boolean enable, String topicName){
363+
if(enable!=null && enable){
364+
topicName = this.defaultMQProducer.withNamespace(topicName);
365+
DefaultMQProducerImpl innerProducer = this.defaultMQProducer.getDefaultMQProducerImpl();
366+
TopicPublishInfo topicInfo = innerProducer.getTopicPublishInfoTable().get(topicName);
367+
if(topicInfo == null){
368+
return null;
369+
}else{
370+
return topicInfo.getMessageQueueList().size();
371+
}
372+
}
373+
return null;
374+
}
356375
}

deployer/src/main/resources/spring/default-instance.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,5 +206,6 @@
206206
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
207207
<property name="partitionHash" value="${canal.mq.partitionHash}" />
208208
<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
209+
<property name="enableDynamicQueuePartition" value="${canal.mq.enableDynamicQueuePartition}" />
209210
</bean>
210211
</beans>

deployer/src/main/resources/spring/file-instance.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,5 +192,6 @@
192192
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
193193
<property name="partitionHash" value="${canal.mq.partitionHash}" />
194194
<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
195+
<property name="enableDynamicQueuePartition" value="${canal.mq.enableDynamicQueuePartition}" />
195196
</bean>
196197
</beans>

deployer/src/main/resources/spring/group-instance.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,5 +287,6 @@
287287
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
288288
<property name="partitionHash" value="${canal.mq.partitionHash}" />
289289
<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
290+
<property name="enableDynamicQueuePartition" value="${canal.mq.enableDynamicQueuePartition}" />
290291
</bean>
291292
</beans>

deployer/src/main/resources/spring/memory-instance.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,5 +180,6 @@
180180
<property name="partitionsNum" value="${canal.mq.partitionsNum}" />
181181
<property name="partitionHash" value="${canal.mq.partitionHash}" />
182182
<property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
183+
<property name="enableDynamicQueuePartition" value="${canal.mq.enableDynamicQueuePartition}" />
183184
</bean>
184185
</beans>

instance/core/src/main/java/com/alibaba/otter/canal/instance/core/CanalMQConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ public class CanalMQConfig {
88
private String partitionHash;
99
private String dynamicTopic;
1010
private String dynamicTopicPartitionNum;
11+
private Boolean enableDynamicQueuePartition;
1112

1213
public String getTopic() {
1314
return topic;
@@ -56,4 +57,12 @@ public String getDynamicTopicPartitionNum() {
5657
public void setDynamicTopicPartitionNum(String dynamicTopicPartitionNum) {
5758
this.dynamicTopicPartitionNum = dynamicTopicPartitionNum;
5859
}
60+
61+
public Boolean getEnableDynamicQueuePartition() {
62+
return enableDynamicQueuePartition;
63+
}
64+
65+
public void setEnableDynamicQueuePartition(Boolean enableDynamicQueuePartition) {
66+
this.enableDynamicQueuePartition = enableDynamicQueuePartition;
67+
}
5968
}

server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,5 @@
11
package com.alibaba.otter.canal.server;
22

3-
import java.util.Map;
4-
import java.util.concurrent.ConcurrentHashMap;
5-
import java.util.concurrent.ExecutorService;
6-
import java.util.concurrent.Executors;
7-
import java.util.concurrent.TimeUnit;
8-
import java.util.concurrent.atomic.AtomicBoolean;
9-
10-
import org.apache.commons.lang.StringUtils;
11-
import org.slf4j.Logger;
12-
import org.slf4j.LoggerFactory;
13-
import org.slf4j.MDC;
14-
153
import com.alibaba.otter.canal.connector.core.config.MQProperties;
164
import com.alibaba.otter.canal.connector.core.producer.MQDestination;
175
import com.alibaba.otter.canal.connector.core.spi.CanalMQProducer;
@@ -21,6 +9,17 @@
219
import com.alibaba.otter.canal.protocol.ClientIdentity;
2210
import com.alibaba.otter.canal.protocol.Message;
2311
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
12+
import org.apache.commons.lang.StringUtils;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
import org.slf4j.MDC;
16+
17+
import java.util.Map;
18+
import java.util.concurrent.ConcurrentHashMap;
19+
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.Executors;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2423

2524
public class CanalMQStarter {
2625

@@ -157,6 +156,7 @@ private void worker(String destination, AtomicBoolean destinationRunning) {
157156
canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
158157
canalDestination.setPartitionHash(mqConfig.getPartitionHash());
159158
canalDestination.setDynamicTopicPartitionNum(mqConfig.getDynamicTopicPartitionNum());
159+
canalDestination.setEnableDynamicQueuePartition(mqConfig.getEnableDynamicQueuePartition());
160160

161161
canalServer.subscribe(clientIdentity);
162162
logger.info("## the MQ producer: {} is running now ......", destination);

0 commit comments

Comments
 (0)