Skip to content

Commit 8ef89dd

Browse files
Smile-cmdwanghengClass
authored andcommitted
增加rabbitmq配置,以持久化方式投递消息 (alibaba#4644)
Co-authored-by: wangheng <760261296@qq.com>
1 parent 68e0df4 commit 8ef89dd

File tree

3 files changed

+48
-6
lines changed

3 files changed

+48
-6
lines changed

connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/config/RabbitMQConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ public class RabbitMQConstants {
1515
public static final String RABBITMQ_VIRTUAL_HOST = ROOT + "." + "virtual.host";
1616
public static final String RABBITMQ_USERNAME = ROOT + "." + "username";
1717
public static final String RABBITMQ_PASSWORD = ROOT + "." + "password";
18+
public static final String RABBITMQ_QUEUE = ROOT + "." + "queue";
19+
public static final String RABBITMQ_ROUTING_KEY = ROOT + "." + "routingKey";
20+
public static final String RABBITMQ_DELIVERY_MODE = ROOT + "." + "deliveryMode";
1821

1922
public static final String RABBITMQ_RESOURCE_OWNERID = ROOT + "." + "rabbitmq.resource.ownerId";
2023
}

connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/config/RabbitMQProducerConfig.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ public class RabbitMQProducerConfig extends MQProperties {
1515
private String exchange;
1616
private String username;
1717
private String password;
18+
private String queue;
19+
private String routingKey;
20+
private String deliveryMode;
1821

1922
public String getHost() {
2023
return host;
@@ -55,4 +58,28 @@ public String getPassword() {
5558
public void setPassword(String password) {
5659
this.password = password;
5760
}
61+
62+
public String getQueue() {
63+
return queue;
64+
}
65+
66+
public void setQueue(String queue) {
67+
this.queue = queue;
68+
}
69+
70+
public String getRoutingKey() {
71+
return routingKey;
72+
}
73+
74+
public void setRoutingKey(String routingKey) {
75+
this.routingKey = routingKey;
76+
}
77+
78+
public String getDeliveryMode() {
79+
return deliveryMode;
80+
}
81+
82+
public void setDeliveryMode(String deliveryMode) {
83+
this.deliveryMode = deliveryMode;
84+
}
5885
}

connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.Properties;
77
import java.util.concurrent.TimeoutException;
88

9+
import com.rabbitmq.client.*;
910
import org.apache.commons.lang.StringUtils;
1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
@@ -26,10 +27,6 @@
2627
import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQProducerConfig;
2728
import com.alibaba.otter.canal.protocol.FlatMessage;
2829
import com.alibaba.otter.canal.protocol.Message;
29-
import com.rabbitmq.client.AlreadyClosedException;
30-
import com.rabbitmq.client.Channel;
31-
import com.rabbitmq.client.Connection;
32-
import com.rabbitmq.client.ConnectionFactory;
3330

3431
/**
3532
* RabbitMQ Producer SPI 实现
@@ -75,7 +72,10 @@ public void init(Properties properties) {
7572
try {
7673
connect = factory.newConnection();
7774
channel = connect.createChannel();
78-
// channel.exchangeDeclare(mqProperties.getExchange(), "topic");
75+
channel.queueDeclare(rabbitMQProperties.getQueue(), true, false, false, null);
76+
channel.exchangeDeclare(rabbitMQProperties.getExchange(), rabbitMQProperties.getDeliveryMode(), true, false, false, null);
77+
channel.queueBind(rabbitMQProperties.getQueue(), rabbitMQProperties.getExchange(), rabbitMQProperties.getRoutingKey());
78+
7979
} catch (IOException | TimeoutException ex) {
8080
throw new CanalException("Start RabbitMQ producer error", ex);
8181
}
@@ -106,6 +106,18 @@ private void loadRabbitMQProperties(Properties properties) {
106106
if (!StringUtils.isEmpty(password)) {
107107
rabbitMQProperties.setPassword(password);
108108
}
109+
String queue = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_QUEUE);
110+
if (!StringUtils.isEmpty(queue)) {
111+
rabbitMQProperties.setQueue(queue);
112+
}
113+
String routingKey = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_ROUTING_KEY);
114+
if (!StringUtils.isEmpty(routingKey)) {
115+
rabbitMQProperties.setRoutingKey(routingKey);
116+
}
117+
String deliveryMode = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_DELIVERY_MODE);
118+
if (!StringUtils.isEmpty(deliveryMode)) {
119+
rabbitMQProperties.setDeliveryMode(deliveryMode);
120+
}
109121
}
110122

111123
@Override
@@ -165,7 +177,7 @@ private void sendMessage(String queueName, byte[] message) {
165177
// tips: 目前逻辑中暂不处理对exchange处理,请在Console后台绑定 才可使用routekey
166178
try {
167179
RabbitMQProducerConfig rabbitMQProperties = (RabbitMQProducerConfig) this.mqProperties;
168-
channel.basicPublish(rabbitMQProperties.getExchange(), queueName, null, message);
180+
channel.basicPublish(rabbitMQProperties.getExchange(), queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message);
169181
} catch (Throwable e) {
170182
throw new RuntimeException(e);
171183
}

0 commit comments

Comments
 (0)