|
6 | 6 | import java.util.Properties; |
7 | 7 | import java.util.concurrent.TimeoutException; |
8 | 8 |
|
| 9 | +import com.rabbitmq.client.*; |
9 | 10 | import org.apache.commons.lang.StringUtils; |
10 | 11 | import org.slf4j.Logger; |
11 | 12 | import org.slf4j.LoggerFactory; |
|
26 | 27 | import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQProducerConfig; |
27 | 28 | import com.alibaba.otter.canal.protocol.FlatMessage; |
28 | 29 | import com.alibaba.otter.canal.protocol.Message; |
29 | | -import com.rabbitmq.client.AlreadyClosedException; |
30 | | -import com.rabbitmq.client.Channel; |
31 | | -import com.rabbitmq.client.Connection; |
32 | | -import com.rabbitmq.client.ConnectionFactory; |
33 | 30 |
|
34 | 31 | /** |
35 | 32 | * RabbitMQ Producer SPI 实现 |
@@ -75,7 +72,10 @@ public void init(Properties properties) { |
75 | 72 | try { |
76 | 73 | connect = factory.newConnection(); |
77 | 74 | channel = connect.createChannel(); |
78 | | - // channel.exchangeDeclare(mqProperties.getExchange(), "topic"); |
| 75 | + channel.queueDeclare(rabbitMQProperties.getQueue(), true, false, false, null); |
| 76 | + channel.exchangeDeclare(rabbitMQProperties.getExchange(), rabbitMQProperties.getDeliveryMode(), true, false, false, null); |
| 77 | + channel.queueBind(rabbitMQProperties.getQueue(), rabbitMQProperties.getExchange(), rabbitMQProperties.getRoutingKey()); |
| 78 | + |
79 | 79 | } catch (IOException | TimeoutException ex) { |
80 | 80 | throw new CanalException("Start RabbitMQ producer error", ex); |
81 | 81 | } |
@@ -106,6 +106,18 @@ private void loadRabbitMQProperties(Properties properties) { |
106 | 106 | if (!StringUtils.isEmpty(password)) { |
107 | 107 | rabbitMQProperties.setPassword(password); |
108 | 108 | } |
| 109 | + String queue = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_QUEUE); |
| 110 | + if (!StringUtils.isEmpty(queue)) { |
| 111 | + rabbitMQProperties.setQueue(queue); |
| 112 | + } |
| 113 | + String routingKey = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_ROUTING_KEY); |
| 114 | + if (!StringUtils.isEmpty(routingKey)) { |
| 115 | + rabbitMQProperties.setRoutingKey(routingKey); |
| 116 | + } |
| 117 | + String deliveryMode = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_DELIVERY_MODE); |
| 118 | + if (!StringUtils.isEmpty(deliveryMode)) { |
| 119 | + rabbitMQProperties.setDeliveryMode(deliveryMode); |
| 120 | + } |
109 | 121 | } |
110 | 122 |
|
111 | 123 | @Override |
@@ -165,7 +177,7 @@ private void sendMessage(String queueName, byte[] message) { |
165 | 177 | // tips: 目前逻辑中暂不处理对exchange处理,请在Console后台绑定 才可使用routekey |
166 | 178 | try { |
167 | 179 | RabbitMQProducerConfig rabbitMQProperties = (RabbitMQProducerConfig) this.mqProperties; |
168 | | - channel.basicPublish(rabbitMQProperties.getExchange(), queueName, null, message); |
| 180 | + channel.basicPublish(rabbitMQProperties.getExchange(), queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message); |
169 | 181 | } catch (Throwable e) { |
170 | 182 | throw new RuntimeException(e); |
171 | 183 | } |
|
0 commit comments