Skip to content

Commit 25be025

Browse files
authored
Some tweaks for pulsarmq-connector (#4060)
* 1. Add the com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer file to the META-INF/canal directory under the pulsarmq-connector project 2. Introduce pulsar-client-admin for Canal to automatically create multi-partition topics 3. Add the judgment that roleToken is null 4. The disconnect method in CanalPulsarMQConsumer removes this.pulsarMQConsumer.unsubscribe();, this code will cause data loss during stop 5. When getting Pulsar messages, they are all processed as flat messages, because CanalMessageSerializerUtil.deserializer(data) will deserialize exceptions 6. Use groupId as subscriptName, without the pulsarmq.subscriptName parameter, the entire adapter will be the same subscriber name using pulsarmq.subscriptName * 逻辑优化 * 代码优化 * 针对pulsar的调整 * 恢复代码 * 恢复代码 * 处理Producer失效的情况
1 parent 8114d77 commit 25be025

File tree

8 files changed

+109
-32
lines changed

8 files changed

+109
-32
lines changed

client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@
134134
<artifactId>pulsar-client</artifactId>
135135
<scope>provided</scope>
136136
</dependency>
137+
<dependency>
138+
<groupId>org.apache.pulsar</groupId>
139+
<artifactId>pulsar-client-admin</artifactId>
140+
<scope>provided</scope>
141+
</dependency>
137142

138143
<!-- junit -->
139144
<dependency>

connector/pulsarmq-connector/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@
3838
<groupId>org.apache.pulsar</groupId>
3939
<artifactId>pulsar-client</artifactId>
4040
</dependency>
41+
<dependency>
42+
<groupId>org.apache.pulsar</groupId>
43+
<artifactId>pulsar-client-admin</artifactId>
44+
</dependency>
4145
</dependencies>
4246

4347
<build>

connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ public class PulsarMQConstants {
5757
* 最大重试次数
5858
*/
5959
public static final String PULSARMQ_MAX_REDELIVERY_COUNT = ROOT + "." + "maxRedeliveryCount";
60+
/**
61+
* Pulsar admin服务器地址
62+
*/
63+
public static final String PULSARMQ_ADMIN_SERVER_URL = ROOT + "." + "adminServerUrl";
6064

6165

6266
}

connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/config/PulsarMQProducerConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public class PulsarMQProducerConfig extends MQProperties {
3030
* 生产者角色权限,请确保该角色有canal使用的所有topic生产者权限(最低要求)
3131
*/
3232
private String roleToken;
33+
/**
34+
* admin服务器地址
35+
*/
36+
private String adminServerUrl;
3337

3438
public String getServerUrl() {
3539
return serverUrl;
@@ -54,4 +58,12 @@ public String getTopicTenantPrefix() {
5458
public void setTopicTenantPrefix(String topicTenantPrefix) {
5559
this.topicTenantPrefix = topicTenantPrefix;
5660
}
61+
62+
public String getAdminServerUrl() {
63+
return adminServerUrl;
64+
}
65+
66+
public void setAdminServerUrl(String adminServerUrl) {
67+
this.adminServerUrl = adminServerUrl;
68+
}
5769
}

connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/consumer/CanalPulsarMQConsumer.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,14 @@
66
import com.alibaba.otter.canal.connector.core.consumer.CommonMessage;
77
import com.alibaba.otter.canal.connector.core.spi.CanalMsgConsumer;
88
import com.alibaba.otter.canal.connector.core.spi.SPI;
9-
import com.alibaba.otter.canal.connector.core.util.CanalMessageSerializerUtil;
10-
import com.alibaba.otter.canal.connector.core.util.MessageUtil;
119
import com.alibaba.otter.canal.connector.pulsarmq.config.PulsarMQConstants;
12-
import com.alibaba.otter.canal.protocol.Message;
1310
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
1411
import com.google.common.collect.Lists;
1512
import org.apache.commons.lang.StringUtils;
1613
import org.apache.pulsar.client.api.*;
1714

18-
import java.time.LocalDateTime;
19-
import java.time.format.DateTimeFormatter;
2015
import java.util.List;
2116
import java.util.Properties;
22-
import java.util.UUID;
2317
import java.util.concurrent.TimeUnit;
2418

2519
/**
@@ -112,7 +106,9 @@ public void init(Properties properties, String topic, String groupId) {
112106
}
113107
this.serviceUrl = properties.getProperty(PulsarMQConstants.PULSARMQ_SERVER_URL);
114108
this.roleToken = properties.getProperty(PulsarMQConstants.PULSARMQ_ROLE_TOKEN);
115-
this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
109+
//this.subscriptName = properties.getProperty(PulsarMQConstants.PULSARMQ_SUBSCRIPT_NAME);
110+
// 采用groupId作为subscriptName,避免所有的都是同一个订阅者名称
111+
this.subscriptName = groupId;
116112
if (StringUtils.isEmpty(this.subscriptName)) {
117113
throw new RuntimeException("Pulsar Consumer subscriptName required");
118114
}
@@ -157,10 +153,12 @@ public void connect() {
157153
}
158154
// 连接创建客户端
159155
try {
160-
pulsarClient = PulsarClient.builder()
161-
.serviceUrl(serviceUrl)
162-
.authentication(AuthenticationFactory.token(roleToken))
163-
.build();
156+
//AuthenticationDataProvider
157+
ClientBuilder builder = PulsarClient.builder().serviceUrl(serviceUrl);
158+
if(StringUtils.isNotEmpty(roleToken)) {
159+
builder.authentication(AuthenticationFactory.token(roleToken));
160+
}
161+
pulsarClient = builder.build();
164162
} catch (PulsarClientException e) {
165163
throw new RuntimeException(e);
166164
}
@@ -220,22 +218,24 @@ public List<CommonMessage> getMessage(Long timeout, TimeUnit unit) {
220218
List<CommonMessage> messageList = Lists.newArrayList();
221219
try {
222220
Messages<byte[]> messages = pulsarMQConsumer.batchReceive();
223-
224221
if (null == messages || messages.size() == 0) {
225222
return messageList;
226223
}
227224
// 保存当前消费记录,用于ack和rollback
228225
this.lastGetBatchMessage = messages;
229226
for (org.apache.pulsar.client.api.Message<byte[]> msg : messages) {
230227
byte[] data = msg.getData();
231-
if (!this.flatMessage) {
228+
/*if (!this.flatMessage) {
232229
Message message = CanalMessageSerializerUtil.deserializer(data);
233230
List<CommonMessage> list = MessageUtil.convert(message);
234231
messageList.addAll(list);
235232
} else {
236233
CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
237234
messageList.add(commonMessage);
238-
}
235+
}*/
236+
// CanalMessageSerializerUtil.deserializer(data) 会转换失败
237+
CommonMessage commonMessage = JSON.parseObject(data, CommonMessage.class);
238+
messageList.add(commonMessage);
239239
}
240240
} catch (PulsarClientException e) {
241241
throw new CanalClientException("Receive pulsar batch message error", e);
@@ -278,7 +278,8 @@ public void disconnect() {
278278
return;
279279
}
280280
try {
281-
this.pulsarMQConsumer.unsubscribe();
281+
// 会导致暂停期间数据丢失
282+
// this.pulsarMQConsumer.unsubscribe();
282283
this.pulsarClient.close();
283284
} catch (PulsarClientException e) {
284285
throw new CanalClientException("Disconnect pulsar consumer error", e);

connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java

Lines changed: 61 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import com.alibaba.otter.canal.protocol.CanalEntry;
1818
import com.alibaba.otter.canal.protocol.FlatMessage;
1919
import org.apache.commons.lang.StringUtils;
20+
import org.apache.pulsar.client.admin.PulsarAdmin;
21+
import org.apache.pulsar.client.admin.PulsarAdminException;
2022
import org.apache.pulsar.client.api.*;
2123
import org.apache.pulsar.shade.com.google.gson.JsonParser;
2224
import org.slf4j.Logger;
@@ -50,6 +52,10 @@ public class CanalPulsarMQProducer extends AbstractMQProducer implements CanalMQ
5052
* pulsar客户端,管理连接
5153
*/
5254
protected PulsarClient client;
55+
/**
56+
* Pulsar admin 客户端
57+
*/
58+
protected PulsarAdmin pulsarAdmin;
5359

5460
@Override
5561
public void init(Properties properties) {
@@ -61,17 +67,28 @@ public void init(Properties properties) {
6167

6268
// 初始化连接客户端
6369
try {
64-
client = PulsarClient.builder()
70+
ClientBuilder builder = PulsarClient.builder()
6571
// 填写pulsar的连接地址
66-
.serviceUrl(pulsarMQProducerConfig.getServerUrl())
67-
// 角色权限认证的token
68-
.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()))
69-
.build();
72+
.serviceUrl(pulsarMQProducerConfig.getServerUrl());
73+
if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getRoleToken())) {
74+
// 角色权限认证的token
75+
builder.authentication(AuthenticationFactory.token(pulsarMQProducerConfig.getRoleToken()));
76+
}
77+
client = builder.build();
7078
} catch (PulsarClientException e) {
7179
throw new RuntimeException(e);
7280
}
73-
// 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
7481

82+
// 初始化Pulsar admin
83+
if(StringUtils.isNotEmpty(pulsarMQProducerConfig.getAdminServerUrl())) {
84+
try {
85+
pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarMQProducerConfig.getAdminServerUrl()).build();
86+
} catch (PulsarClientException e) {
87+
throw new RuntimeException(e);
88+
}
89+
}
90+
91+
// 加载所有生产者 --> topic可能有正则或表名,无法确认所有topic,在使用时再加载
7592
int parallelPartitionSendThreadSize = mqProperties.getParallelSendThreadSize();
7693
sendPartitionExecutor = new ThreadPoolExecutor(parallelPartitionSendThreadSize,
7794
parallelPartitionSendThreadSize,
@@ -106,6 +123,10 @@ private void loadPulsarMQProperties(Properties properties) {
106123
if (!StringUtils.isEmpty(topicTenantPrefix)) {
107124
tmpProperties.setTopicTenantPrefix(topicTenantPrefix);
108125
}
126+
String adminServerUrl = PropertiesUtils.getProperty(properties, PulsarMQConstants.PULSARMQ_ADMIN_SERVER_URL);
127+
if(!StringUtils.isEmpty(adminServerUrl)) {
128+
tmpProperties.setAdminServerUrl(adminServerUrl);
129+
}
109130
if (logger.isDebugEnabled()) {
110131
logger.debug("Load pulsar properties ==> {}", JSON.toJSON(this.mqProperties));
111132
}
@@ -182,6 +203,11 @@ public void send(final MQDestination destination, String topicName, com.alibaba.
182203
if (partitionNum == null) {
183204
partitionNum = destination.getPartitionsNum();
184205
}
206+
// 创建多分区topic
207+
if(pulsarAdmin!=null && partitionNum!=null && partitionNum>0 && PRODUCERS.get(topicName)==null) {
208+
createMultipleTopic(topicName, partitionNum);
209+
}
210+
185211
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
186212
// 并发构造
187213
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
@@ -319,22 +345,41 @@ private void sendMessage(String topic, int partition, List<FlatMessage> flatMess
319345
}
320346

321347
/**
322-
* 获取指定topic的生产者,并且使用缓存
323-
*
348+
* 创建多分区topic
324349
* @param topic
325-
* @return org.apache.pulsar.client.api.Producer<byte [ ]>
326-
* @date 2021/9/10 11:21
327-
* @author chad
328-
* @since 1 by chad at 2021/9/10 新增
350+
* @param partitionNum
351+
*/
352+
private void createMultipleTopic(String topic,Integer partitionNum) {
353+
// 拼接topic前缀
354+
PulsarMQProducerConfig pulsarMQProperties = (PulsarMQProducerConfig) this.mqProperties;
355+
String prefix = pulsarMQProperties.getTopicTenantPrefix();
356+
String fullTopic = topic;
357+
if (!StringUtils.isEmpty(prefix)) {
358+
if (!prefix.endsWith("/")) {
359+
fullTopic = "/" + fullTopic;
360+
}
361+
fullTopic = pulsarMQProperties.getTopicTenantPrefix() + fullTopic;
362+
}
363+
364+
// 创建分区topic
365+
try {
366+
pulsarAdmin.topics().createPartitionedTopic(fullTopic, partitionNum);
367+
} catch (PulsarAdminException e) {
368+
// TODO 无论是否报错,都继续后续的操作,此处不进行阻塞
369+
}
370+
}
371+
/**
372+
* 获取topic
373+
* @param topic
374+
* @return
329375
*/
330376
private Producer<byte[]> getProducer(String topic) {
331377
Producer producer = PRODUCERS.get(topic);
332-
333-
if (null == producer) {
378+
if (null == producer || !producer.isConnected()) {
334379
try {
335380
synchronized (PRODUCERS) {
336381
producer = PRODUCERS.get(topic);
337-
if (null != producer) {
382+
if (null != producer && producer.isConnected()) {
338383
return producer;
339384
}
340385

@@ -405,7 +450,7 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
405450

406451
@Override
407452
public void stop() {
408-
logger.info("## Stop RocketMQ producer##");
453+
logger.info("## Stop PulsarMQ producer##");
409454

410455
for (Producer p : PRODUCERS.values()) {
411456
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pulsarmq=com.alibaba.otter.canal.connector.pulsarmq.consumer.CanalPulsarMQConsumer

pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@
260260
<dependency>
261261
<groupId>com.alibaba</groupId>
262262
<artifactId>druid</artifactId>
263-
<version>1.2.6</version>
263+
<version>1.2.8</version>
264264
</dependency>
265265
<dependency>
266266
<groupId>com.lmax</groupId>
@@ -346,6 +346,11 @@
346346
<artifactId>pulsar-client</artifactId>
347347
<version>2.8.1</version>
348348
</dependency>
349+
<dependency>
350+
<groupId>org.apache.pulsar</groupId>
351+
<artifactId>pulsar-client-admin</artifactId>
352+
<version>2.8.1</version>
353+
</dependency>
349354
</dependencies>
350355
</dependencyManagement>
351356

0 commit comments

Comments
 (0)